1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009, 2010, 2011 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/psql-reader.h"
25 #include "data/calendar.h"
26 #include "data/casereader-provider.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/variable.h"
30 #include "libpspp/i18n.h"
31 #include "libpspp/message.h"
32 #include "libpspp/misc.h"
33 #include "libpspp/str.h"
35 #include "gl/xalloc.h"
36 #include "gl/minmax.h"
39 #define _(msgid) gettext (msgid)
40 #define N_(msgid) (msgid)
45 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
47 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
58 /* Default width of string variables. */
59 #define PSQL_DEFAULT_WIDTH 8
61 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
74 #define BPCHAROID 1042
75 #define VARCHAROID 1043
78 #define TIMESTAMPOID 1114
79 #define TIMESTAMPTZOID 1184
80 #define INTERVALOID 1186
81 #define TIMETZOID 1266
82 #define NUMERICOID 1700
84 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
86 static struct ccase *psql_casereader_read (struct casereader *, void *);
88 static const struct casereader_class psql_casereader_class =
91 psql_casereader_destroy,
102 bool integer_datetimes;
104 double postgres_epoch;
106 struct caseproto *proto;
107 struct dictionary *dict;
109 /* An array of ints, which maps psql column numbers into
111 struct variable **vmap;
114 struct string fetch_cmd;
119 static struct ccase *set_value (struct psql_reader *r);
125 data_to_native (const void *in_, void *out_, int len)
128 const unsigned char *in = in_;
129 unsigned char *out = out_;
130 for (i = 0 ; i < len ; ++i )
135 data_to_native (const void *in_, void *out_, int len)
138 const unsigned char *in = in_;
139 unsigned char *out = out_;
140 for (i = 0 ; i < len ; ++i )
141 out[len - i - 1] = in[i];
146 #define GET_VALUE(IN, OUT) do { \
147 size_t sz = sizeof (OUT); \
148 data_to_native (*(IN), &(OUT), sz) ; \
155 dump (const unsigned char *x, int l)
159 for (i = 0; i < l ; ++i)
161 printf ("%02x ", x[i]);
166 for (i = 0; i < l ; ++i)
169 printf ("%c ", x[i]);
178 static struct variable *
179 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
180 int width, const char *suggested_name, int col)
182 unsigned long int vx = 0;
183 struct variable *var;
186 name = dict_make_unique_var_name (r->dict, suggested_name, &vx);
187 var = dict_create_var (r->dict, name, width);
190 var_set_both_formats (var, fmt);
194 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
197 r->vmapsize = col + 1;
208 reload_cache (struct psql_reader *r)
213 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
215 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
227 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
230 int n_fields, n_tuples;
231 PGresult *qres = NULL;
232 casenumber n_cases = CASENUMBER_MAX;
233 const char *encoding;
235 struct psql_reader *r = xzalloc (sizeof *r);
236 struct string query ;
238 r->conn = PQconnectdb (info->conninfo);
239 if ( NULL == r->conn)
241 msg (ME, _("Memory error whilst opening psql source"));
245 if ( PQstatus (r->conn) != CONNECTION_OK )
247 msg (ME, _("Error opening psql source: %s."),
248 PQerrorMessage (r->conn));
255 const char *vers = PQparameterStatus (r->conn, "server_version");
257 sscanf (vers, "%d", &ver_num);
262 _("Postgres server is version %s."
263 " Reading from versions earlier than 8.0 is not supported."),
271 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
273 r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
277 if ( PQgetssl (r->conn) == NULL)
280 if (! info->allow_clear)
282 msg (ME, _("Connection is unencrypted, "
283 "but unencrypted connections have not been permitted."));
288 r->postgres_epoch = calendar_gregorian_to_offset (2000, 1, 1, NULL);
291 const int enc = PQclientEncoding (r->conn);
293 /* According to section 22.2 of the Postgresql manual
294 a value of zero (SQL_ASCII) indicates
295 "a declaration of ignorance about the encoding".
296 Accordingly, we use the default encoding
297 if we find this value.
299 encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding ();
301 /* Create the dictionary and populate it */
302 *dict = r->dict = dict_create (encoding);
306 select count (*) from (select * from medium) stupid_sql_standard;
308 ds_init_cstr (&query,
309 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
310 "DECLARE pspp BINARY CURSOR FOR ");
312 ds_put_substring (&query, info->sql.ss);
314 qres = PQexec (r->conn, ds_cstr (&query));
316 if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
318 msg (ME, _("Error from psql source: %s."),
319 PQresultErrorMessage (qres));
326 /* Now use the count() function to find the total number of cases
327 that this query returns.
328 Doing this incurs some overhead. The server has to iterate every
329 case in order to find this number. However, it's performed on the
330 server side, and in all except the most huge databases the extra
331 overhead will be worth the effort.
332 On the other hand, most PSPP functions don't need to know this.
333 The GUI is the notable exception.
335 ds_init_cstr (&query, "SELECT count (*) FROM (");
336 ds_put_substring (&query, info->sql.ss);
337 ds_put_cstr (&query, ") stupid_sql_standard");
339 qres = PQexec (r->conn, ds_cstr (&query));
341 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
343 msg (ME, _("Error from psql source: %s."),
344 PQresultErrorMessage (qres));
347 n_cases = atol (PQgetvalue (qres, 0, 0));
350 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
351 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
353 msg (ME, _("Error from psql source: %s."),
354 PQresultErrorMessage (qres));
358 n_tuples = PQntuples (qres);
359 n_fields = PQnfields (qres);
365 for (i = 0 ; i < n_fields ; ++i )
367 struct variable *var;
368 struct fmt_spec fmt = {FMT_F, 8, 2};
369 Oid type = PQftype (qres, i);
373 /* If there are no data then make a finger in the air
374 guess at the contents */
376 length = PQgetlength (qres, 0, i);
378 length = PSQL_DEFAULT_WIDTH;
392 fmt.type = FMT_DOLLAR;
396 width = length > 0 ? length : 1;
404 width = (info->str_width == -1) ?
405 ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
411 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
416 fmt.type = FMT_DTIME;
436 fmt.type = FMT_DATETIME;
448 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
450 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
456 if ( width == 0 && fmt_is_string (fmt.type))
457 fmt.w = width = PSQL_DEFAULT_WIDTH;
460 var = create_var (r, &fmt, width, PQfname (qres, i), i);
461 if ( type == NUMERICOID && n_tuples > 0)
463 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
465 int16_t n_digits, weight, dscale;
468 GET_VALUE (&vptr, n_digits);
469 GET_VALUE (&vptr, weight);
470 GET_VALUE (&vptr, sign);
471 GET_VALUE (&vptr, dscale);
475 fmt.w = fmt_max_output_width (fmt.type) ;
476 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
477 var_set_both_formats (var, &fmt);
480 /* Timezones need an extra variable */
486 ds_init_cstr (&name, var_get_name (var));
487 ds_put_cstr (&name, "-zone");
492 create_var (r, &fmt, 0, ds_cstr (&name), -1);
501 ds_init_cstr (&name, var_get_name (var));
502 ds_put_cstr (&name, "-months");
507 create_var (r, &fmt, 0, ds_cstr (&name), -1);
518 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
519 if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
526 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
528 ds_init_empty (&r->fetch_cmd);
529 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
532 r->proto = caseproto_ref (dict_get_proto (*dict));
534 return casereader_create_sequential
538 &psql_casereader_class, r);
541 dict_destroy (*dict);
543 psql_casereader_destroy (NULL, r);
549 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
551 struct psql_reader *r = r_;
555 ds_destroy (&r->fetch_cmd);
557 if (r->res) PQclear (r->res);
559 caseproto_unref (r->proto);
566 static struct ccase *
567 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
569 struct psql_reader *r = r_;
571 if ( NULL == r->res || r->tuple >= r->cache_size)
573 if ( ! reload_cache (r) )
577 return set_value (r);
580 static struct ccase *
581 set_value (struct psql_reader *r)
589 n_vars = PQnfields (r->res);
591 if ( r->tuple >= PQntuples (r->res))
594 c = case_create (r->proto);
595 case_set_missing (c);
598 for (i = 0 ; i < n_vars ; ++i )
600 Oid type = PQftype (r->res, i);
601 const struct variable *v = r->vmap[i];
602 union value *val = case_data_rw (c, v);
604 union value *val1 = NULL;
611 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
613 const struct variable *v1 = NULL;
614 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
616 val1 = case_data_rw (c, v1);
624 if (PQgetisnull (r->res, r->tuple, i))
626 value_set_missing (val, var_get_width (v));
641 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
642 int length = PQgetlength (r->res, r->tuple, i);
644 int var_width = var_get_width (v);
650 GET_VALUE (&vptr, x);
659 GET_VALUE (&vptr, x);
667 GET_VALUE (&vptr, x);
675 GET_VALUE (&vptr, x);
683 GET_VALUE (&vptr, n);
691 GET_VALUE (&vptr, n);
698 /* Postgres 8.3 uses 64 bits.
699 Earlier versions use 32 */
705 GET_VALUE (&vptr, x);
712 GET_VALUE (&vptr, x);
725 if ( r->integer_datetimes )
732 GET_VALUE (&vptr, things);
733 GET_VALUE (&vptr, us);
734 GET_VALUE (&vptr, days);
735 GET_VALUE (&vptr, months);
737 val->f = us / 1000000.0;
738 val->f += days * 24 * 3600;
744 uint32_t days, months;
747 GET_VALUE (&vptr, seconds);
748 GET_VALUE (&vptr, days);
749 GET_VALUE (&vptr, months);
752 val->f += days * 24 * 3600;
763 GET_VALUE (&vptr, x);
765 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
771 if ( r->integer_datetimes)
774 GET_VALUE (&vptr, x);
775 val->f = x / 1000000.0;
780 GET_VALUE (&vptr, x);
789 if ( r->integer_datetimes)
794 GET_VALUE (&vptr, x);
795 val->f = x / 1000000.0;
801 GET_VALUE (&vptr, x);
805 GET_VALUE (&vptr, zone);
806 val1->f = zone / 3600.0;
813 if ( r->integer_datetimes)
817 GET_VALUE (&vptr, x);
821 val->f = (x + r->postgres_epoch * 24 * 3600 );
827 GET_VALUE (&vptr, x);
829 val->f = (x + r->postgres_epoch * 24 * 3600 );
837 memcpy (value_str_rw (val, var_width), vptr,
838 MIN (length, var_width));
845 int16_t n_digits, weight, dscale;
848 GET_VALUE (&vptr, n_digits);
849 GET_VALUE (&vptr, weight);
850 GET_VALUE (&vptr, sign);
851 GET_VALUE (&vptr, dscale);
858 fmt.w = fmt_max_output_width (fmt.type) ;
859 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
860 var_set_both_formats (v, &fmt);
864 for (i = 0 ; i < n_digits; ++i)
867 GET_VALUE (&vptr, x);
868 f += x * pow (10000, weight--);