1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/psql-reader.h"
25 #include "data/calendar.h"
26 #include "data/casereader-provider.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/variable.h"
30 #include "libpspp/i18n.h"
31 #include "libpspp/message.h"
32 #include "libpspp/misc.h"
33 #include "libpspp/str.h"
35 #include "gl/c-strcase.h"
36 #include "gl/minmax.h"
37 #include "gl/xalloc.h"
40 #define _(msgid) gettext (msgid)
41 #define N_(msgid) (msgid)
46 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
48 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
59 /* Default width of string variables. */
60 #define PSQL_DEFAULT_WIDTH 8
62 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
75 #define BPCHAROID 1042
76 #define VARCHAROID 1043
79 #define TIMESTAMPOID 1114
80 #define TIMESTAMPTZOID 1184
81 #define INTERVALOID 1186
82 #define TIMETZOID 1266
83 #define NUMERICOID 1700
85 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
87 static struct ccase *psql_casereader_read (struct casereader *, void *);
89 static const struct casereader_class psql_casereader_class =
92 psql_casereader_destroy,
103 bool integer_datetimes;
105 double postgres_epoch;
107 struct caseproto *proto;
108 struct dictionary *dict;
110 /* An array of ints, which maps psql column numbers into
112 struct variable **vmap;
115 struct string fetch_cmd;
120 static struct ccase *set_value (struct psql_reader *r);
126 data_to_native (const void *in_, void *out_, int len)
129 const unsigned char *in = in_;
130 unsigned char *out = out_;
131 for (i = 0 ; i < len ; ++i )
136 data_to_native (const void *in_, void *out_, int len)
139 const unsigned char *in = in_;
140 unsigned char *out = out_;
141 for (i = 0 ; i < len ; ++i )
142 out[len - i - 1] = in[i];
147 #define GET_VALUE(IN, OUT) do { \
148 size_t sz = sizeof (OUT); \
149 data_to_native (*(IN), &(OUT), sz) ; \
156 dump (const unsigned char *x, int l)
160 for (i = 0; i < l ; ++i)
162 printf ("%02x ", x[i]);
167 for (i = 0; i < l ; ++i)
170 printf ("%c ", x[i]);
179 static struct variable *
180 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
181 int width, const char *suggested_name, int col)
183 unsigned long int vx = 0;
184 struct variable *var;
187 name = dict_make_unique_var_name (r->dict, suggested_name, &vx);
188 var = dict_create_var (r->dict, name, width);
191 var_set_both_formats (var, fmt);
195 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
198 r->vmapsize = col + 1;
209 reload_cache (struct psql_reader *r)
214 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
216 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
228 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
231 int n_fields, n_tuples;
232 PGresult *qres = NULL;
233 casenumber n_cases = CASENUMBER_MAX;
234 const char *encoding;
236 struct psql_reader *r = xzalloc (sizeof *r);
237 struct string query ;
239 r->conn = PQconnectdb (info->conninfo);
240 if ( NULL == r->conn)
242 msg (ME, _("Memory error whilst opening psql source"));
246 if ( PQstatus (r->conn) != CONNECTION_OK )
248 msg (ME, _("Error opening psql source: %s."),
249 PQerrorMessage (r->conn));
256 const char *vers = PQparameterStatus (r->conn, "server_version");
258 sscanf (vers, "%d", &ver_num);
263 _("Postgres server is version %s."
264 " Reading from versions earlier than 8.0 is not supported."),
272 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
274 r->integer_datetimes = ( 0 == c_strcasecmp (dt, "on"));
278 if ( PQgetssl (r->conn) == NULL)
281 if (! info->allow_clear)
283 msg (ME, _("Connection is unencrypted, "
284 "but unencrypted connections have not been permitted."));
289 r->postgres_epoch = calendar_gregorian_to_offset (2000, 1, 1, NULL);
292 const int enc = PQclientEncoding (r->conn);
294 /* According to section 22.2 of the Postgresql manual
295 a value of zero (SQL_ASCII) indicates
296 "a declaration of ignorance about the encoding".
297 Accordingly, we use the default encoding
298 if we find this value.
300 encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding ();
302 /* Create the dictionary and populate it */
303 *dict = r->dict = dict_create (encoding);
307 select count (*) from (select * from medium) stupid_sql_standard;
309 ds_init_cstr (&query,
310 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
311 "DECLARE pspp BINARY CURSOR FOR ");
313 ds_put_substring (&query, info->sql.ss);
315 qres = PQexec (r->conn, ds_cstr (&query));
317 if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
319 msg (ME, _("Error from psql source: %s."),
320 PQresultErrorMessage (qres));
327 /* Now use the count() function to find the total number of cases
328 that this query returns.
329 Doing this incurs some overhead. The server has to iterate every
330 case in order to find this number. However, it's performed on the
331 server side, and in all except the most huge databases the extra
332 overhead will be worth the effort.
333 On the other hand, most PSPP functions don't need to know this.
334 The GUI is the notable exception.
336 ds_init_cstr (&query, "SELECT count (*) FROM (");
337 ds_put_substring (&query, info->sql.ss);
338 ds_put_cstr (&query, ") stupid_sql_standard");
340 qres = PQexec (r->conn, ds_cstr (&query));
342 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
344 msg (ME, _("Error from psql source: %s."),
345 PQresultErrorMessage (qres));
348 n_cases = atol (PQgetvalue (qres, 0, 0));
351 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
352 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
354 msg (ME, _("Error from psql source: %s."),
355 PQresultErrorMessage (qres));
359 n_tuples = PQntuples (qres);
360 n_fields = PQnfields (qres);
366 for (i = 0 ; i < n_fields ; ++i )
368 struct variable *var;
369 struct fmt_spec fmt = {FMT_F, 8, 2};
370 Oid type = PQftype (qres, i);
374 /* If there are no data then make a finger in the air
375 guess at the contents */
377 length = PQgetlength (qres, 0, i);
379 length = PSQL_DEFAULT_WIDTH;
393 fmt.type = FMT_DOLLAR;
397 width = length > 0 ? length : 1;
405 width = (info->str_width == -1) ?
406 ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
412 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
417 fmt.type = FMT_DTIME;
437 fmt.type = FMT_DATETIME;
449 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
451 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
457 if ( width == 0 && fmt_is_string (fmt.type))
458 fmt.w = width = PSQL_DEFAULT_WIDTH;
461 var = create_var (r, &fmt, width, PQfname (qres, i), i);
462 if ( type == NUMERICOID && n_tuples > 0)
464 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
466 int16_t n_digits, weight, dscale;
469 GET_VALUE (&vptr, n_digits);
470 GET_VALUE (&vptr, weight);
471 GET_VALUE (&vptr, sign);
472 GET_VALUE (&vptr, dscale);
476 fmt.w = fmt_max_output_width (fmt.type) ;
477 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
478 var_set_both_formats (var, &fmt);
481 /* Timezones need an extra variable */
487 ds_init_cstr (&name, var_get_name (var));
488 ds_put_cstr (&name, "-zone");
493 create_var (r, &fmt, 0, ds_cstr (&name), -1);
502 ds_init_cstr (&name, var_get_name (var));
503 ds_put_cstr (&name, "-months");
508 create_var (r, &fmt, 0, ds_cstr (&name), -1);
519 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
520 if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
527 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
529 ds_init_empty (&r->fetch_cmd);
530 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
533 r->proto = caseproto_ref (dict_get_proto (*dict));
535 return casereader_create_sequential
539 &psql_casereader_class, r);
542 dict_destroy (*dict);
544 psql_casereader_destroy (NULL, r);
550 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
552 struct psql_reader *r = r_;
556 ds_destroy (&r->fetch_cmd);
558 if (r->res) PQclear (r->res);
560 caseproto_unref (r->proto);
567 static struct ccase *
568 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
570 struct psql_reader *r = r_;
572 if ( NULL == r->res || r->tuple >= r->cache_size)
574 if ( ! reload_cache (r) )
578 return set_value (r);
581 static struct ccase *
582 set_value (struct psql_reader *r)
590 n_vars = PQnfields (r->res);
592 if ( r->tuple >= PQntuples (r->res))
595 c = case_create (r->proto);
596 case_set_missing (c);
599 for (i = 0 ; i < n_vars ; ++i )
601 Oid type = PQftype (r->res, i);
602 const struct variable *v = r->vmap[i];
603 union value *val = case_data_rw (c, v);
605 union value *val1 = NULL;
612 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
614 const struct variable *v1 = NULL;
615 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
617 val1 = case_data_rw (c, v1);
625 if (PQgetisnull (r->res, r->tuple, i))
627 value_set_missing (val, var_get_width (v));
642 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
643 int length = PQgetlength (r->res, r->tuple, i);
645 int var_width = var_get_width (v);
651 GET_VALUE (&vptr, x);
660 GET_VALUE (&vptr, x);
668 GET_VALUE (&vptr, x);
676 GET_VALUE (&vptr, x);
684 GET_VALUE (&vptr, n);
692 GET_VALUE (&vptr, n);
699 /* Postgres 8.3 uses 64 bits.
700 Earlier versions use 32 */
706 GET_VALUE (&vptr, x);
713 GET_VALUE (&vptr, x);
726 if ( r->integer_datetimes )
733 GET_VALUE (&vptr, things);
734 GET_VALUE (&vptr, us);
735 GET_VALUE (&vptr, days);
736 GET_VALUE (&vptr, months);
738 val->f = us / 1000000.0;
739 val->f += days * 24 * 3600;
745 uint32_t days, months;
748 GET_VALUE (&vptr, seconds);
749 GET_VALUE (&vptr, days);
750 GET_VALUE (&vptr, months);
753 val->f += days * 24 * 3600;
764 GET_VALUE (&vptr, x);
766 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
772 if ( r->integer_datetimes)
775 GET_VALUE (&vptr, x);
776 val->f = x / 1000000.0;
781 GET_VALUE (&vptr, x);
790 if ( r->integer_datetimes)
795 GET_VALUE (&vptr, x);
796 val->f = x / 1000000.0;
802 GET_VALUE (&vptr, x);
806 GET_VALUE (&vptr, zone);
807 val1->f = zone / 3600.0;
814 if ( r->integer_datetimes)
818 GET_VALUE (&vptr, x);
822 val->f = (x + r->postgres_epoch * 24 * 3600 );
828 GET_VALUE (&vptr, x);
830 val->f = (x + r->postgres_epoch * 24 * 3600 );
838 memcpy (value_str_rw (val, var_width), vptr,
839 MIN (length, var_width));
846 int16_t n_digits, weight, dscale;
849 GET_VALUE (&vptr, n_digits);
850 GET_VALUE (&vptr, weight);
851 GET_VALUE (&vptr, sign);
852 GET_VALUE (&vptr, dscale);
859 fmt.w = fmt_max_output_width (fmt.type) ;
860 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
861 var_set_both_formats (v, &fmt);
865 for (i = 0 ; i < n_digits; ++i)
868 GET_VALUE (&vptr, x);
869 f += x * pow (10000, weight--);