1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
25 #include "psql-reader.h"
31 #include <libpspp/str.h>
34 #define _(msgid) gettext (msgid)
35 #define N_(msgid) (msgid)
40 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
42 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
53 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
66 #define BPCHAROID 1042
67 #define VARCHAROID 1043
70 #define TIMESTAMPOID 1114
71 #define TIMESTAMPTZOID 1184
72 #define INTERVALOID 1186
73 #define TIMETZOID 1266
74 #define NUMERICOID 1700
76 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
78 static struct ccase *psql_casereader_read (struct casereader *, void *);
80 static const struct casereader_class psql_casereader_class =
83 psql_casereader_destroy,
94 bool integer_datetimes;
96 double postgres_epoch;
99 struct dictionary *dict;
101 /* An array of ints, which maps psql column numbers into
103 struct variable **vmap;
106 struct string fetch_cmd;
111 static struct ccase *set_value (struct psql_reader *r);
117 data_to_native (const void *in_, void *out_, int len)
120 const unsigned char *in = in_;
121 unsigned char *out = out_;
122 for (i = 0 ; i < len ; ++i )
127 data_to_native (const void *in_, void *out_, int len)
130 const unsigned char *in = in_;
131 unsigned char *out = out_;
132 for (i = 0 ; i < len ; ++i )
133 out[len - i - 1] = in[i];
138 #define GET_VALUE(IN, OUT) do { \
139 size_t sz = sizeof (OUT); \
140 data_to_native (*(IN), &(OUT), sz) ; \
147 dump (const unsigned char *x, int l)
151 for (i = 0; i < l ; ++i)
153 printf ("%02x ", x[i]);
158 for (i = 0; i < l ; ++i)
161 printf ("%c ", x[i]);
170 static struct variable *
171 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
172 int width, const char *suggested_name, int col)
174 unsigned long int vx = 0;
175 struct variable *var;
176 char name[VAR_NAME_LEN + 1];
178 r->value_cnt += value_cnt_from_width (width);
180 if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
182 msg (ME, _("Cannot create variable name from %s"), suggested_name);
186 var = dict_create_var (r->dict, name, width);
187 var_set_both_formats (var, fmt);
191 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
194 r->vmapsize = col + 1;
205 reload_cache (struct psql_reader *r)
210 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
212 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
224 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
227 int n_fields, n_tuples;
228 PGresult *qres = NULL;
229 casenumber n_cases = CASENUMBER_MAX;
231 struct psql_reader *r = xzalloc (sizeof *r);
232 struct string query ;
234 r->conn = PQconnectdb (info->conninfo);
235 if ( NULL == r->conn)
237 msg (ME, _("Memory error whilst opening psql source"));
241 if ( PQstatus (r->conn) != CONNECTION_OK )
243 msg (ME, _("Error opening psql source: %s."),
244 PQerrorMessage (r->conn));
251 const char *vers = PQparameterStatus (r->conn, "server_version");
253 sscanf (vers, "%d", &ver_num);
258 _("Postgres server is version %s."
259 " Reading from versions earlier than 8.0 is not supported."),
267 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
269 r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
273 if ( PQgetssl (r->conn) == NULL)
276 if (! info->allow_clear)
278 msg (ME, _("Connection is unencrypted, "
279 "but unencrypted connections have not been permitted."));
285 calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
288 /* Create the dictionary and populate it */
289 *dict = r->dict = dict_create ();
292 const int enc = PQclientEncoding (r->conn);
294 /* According to section 22.2 of the Postgresql manual
295 a value of zero (SQL_ASCII) indicates
296 "a declaration of ignorance about the encoding".
297 Accordingly, we don't set the dictionary's encoding
298 if we find this value.
301 dict_set_encoding (r->dict, pg_encoding_to_char (enc));
305 select count (*) from (select * from medium) stupid_sql_standard;
307 ds_init_cstr (&query,
308 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
309 "DECLARE pspp BINARY CURSOR FOR ");
311 ds_put_substring (&query, info->sql.ss);
313 qres = PQexec (r->conn, ds_cstr (&query));
315 if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
317 msg (ME, _("Error from psql source: %s."),
318 PQresultErrorMessage (qres));
325 /* Now use the count() function to find the total number of cases
326 that this query returns.
327 Doing this incurs some overhead. The server has to iterate every
328 case in order to find this number. However, it's performed on the
329 server side, and in all except the most huge databases the extra
330 overhead will be worth the effort.
331 On the other hand, most PSPP functions don't need to know this.
332 The GUI is the notable exception.
334 ds_init_cstr (&query, "SELECT count (*) FROM (");
335 ds_put_substring (&query, info->sql.ss);
336 ds_put_cstr (&query, ") stupid_sql_standard");
338 qres = PQexec (r->conn, ds_cstr (&query));
340 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
342 msg (ME, _("Error from psql source: %s."),
343 PQresultErrorMessage (qres));
346 n_cases = atol (PQgetvalue (qres, 0, 0));
349 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
350 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
352 msg (ME, _("Error from psql source: %s."),
353 PQresultErrorMessage (qres));
357 n_tuples = PQntuples (qres);
358 n_fields = PQnfields (qres);
364 for (i = 0 ; i < n_fields ; ++i )
366 struct variable *var;
367 struct fmt_spec fmt = {FMT_F, 8, 2};
368 Oid type = PQftype (qres, i);
372 /* If there are no data then make a finger in the air
373 guess at the contents */
375 length = PQgetlength (qres, 0, i);
377 length = MAX_SHORT_STRING;
391 fmt.type = FMT_DOLLAR;
395 width = length > 0 ? length : 1;
403 width = (info->str_width == -1) ?
404 ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
410 width = length > 0 ? length : MAX_SHORT_STRING;
415 fmt.type = FMT_DTIME;
435 fmt.type = FMT_DATETIME;
447 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
449 width = length > 0 ? length : MAX_SHORT_STRING;
455 var = create_var (r, &fmt, width, PQfname (qres, i), i);
456 if ( type == NUMERICOID && n_tuples > 0)
458 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
460 int16_t n_digits, weight, dscale;
463 GET_VALUE (&vptr, n_digits);
464 GET_VALUE (&vptr, weight);
465 GET_VALUE (&vptr, sign);
466 GET_VALUE (&vptr, dscale);
470 fmt.w = fmt_max_output_width (fmt.type) ;
471 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
472 var_set_both_formats (var, &fmt);
475 /* Timezones need an extra variable */
481 ds_init_cstr (&name, var_get_name (var));
482 ds_put_cstr (&name, "-zone");
487 create_var (r, &fmt, 0, ds_cstr (&name), -1);
496 ds_init_cstr (&name, var_get_name (var));
497 ds_put_cstr (&name, "-months");
502 create_var (r, &fmt, 0, ds_cstr (&name), -1);
513 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
514 if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
521 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
523 ds_init_empty (&r->fetch_cmd);
524 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
528 return casereader_create_sequential
532 &psql_casereader_class, r);
535 dict_destroy (*dict);
537 psql_casereader_destroy (NULL, r);
543 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
545 struct psql_reader *r = r_;
549 ds_destroy (&r->fetch_cmd);
551 if (r->res) PQclear (r->res);
559 static struct ccase *
560 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
562 struct psql_reader *r = r_;
564 if ( NULL == r->res || r->tuple >= r->cache_size)
566 if ( ! reload_cache (r) )
570 return set_value (r);
573 static struct ccase *
574 set_value (struct psql_reader *r)
582 n_vars = PQnfields (r->res);
584 if ( r->tuple >= PQntuples (r->res))
587 c = case_create (r->value_cnt);
588 memset (case_data_rw_idx (c, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
591 for (i = 0 ; i < n_vars ; ++i )
593 Oid type = PQftype (r->res, i);
594 const struct variable *v = r->vmap[i];
595 union value *val = case_data_rw (c, v);
597 union value *val1 = NULL;
604 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
606 const struct variable *v1 = NULL;
607 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
609 val1 = case_data_rw (c, v1);
617 if (PQgetisnull (r->res, r->tuple, i))
619 value_set_missing (val, var_get_width (v));
634 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
635 int length = PQgetlength (r->res, r->tuple, i);
637 int var_width = var_get_width (v);
643 GET_VALUE (&vptr, x);
652 GET_VALUE (&vptr, x);
660 GET_VALUE (&vptr, x);
668 GET_VALUE (&vptr, x);
676 GET_VALUE (&vptr, n);
684 GET_VALUE (&vptr, n);
691 /* Postgres 8.3 uses 64 bits.
692 Earlier versions use 32 */
698 GET_VALUE (&vptr, x);
705 GET_VALUE (&vptr, x);
718 if ( r->integer_datetimes )
725 GET_VALUE (&vptr, things);
726 GET_VALUE (&vptr, us);
727 GET_VALUE (&vptr, days);
728 GET_VALUE (&vptr, months);
730 val->f = us / 1000000.0;
731 val->f += days * 24 * 3600;
737 uint32_t days, months;
740 GET_VALUE (&vptr, seconds);
741 GET_VALUE (&vptr, days);
742 GET_VALUE (&vptr, months);
745 val->f += days * 24 * 3600;
756 GET_VALUE (&vptr, x);
758 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
764 if ( r->integer_datetimes)
767 GET_VALUE (&vptr, x);
768 val->f = x / 1000000.0;
773 GET_VALUE (&vptr, x);
782 if ( r->integer_datetimes)
787 GET_VALUE (&vptr, x);
788 val->f = x / 1000000.0;
794 GET_VALUE (&vptr, x);
798 GET_VALUE (&vptr, zone);
799 val1->f = zone / 3600.0;
806 if ( r->integer_datetimes)
810 GET_VALUE (&vptr, x);
814 val->f = (x + r->postgres_epoch * 24 * 3600 );
820 GET_VALUE (&vptr, x);
822 val->f = (x + r->postgres_epoch * 24 * 3600 );
830 memcpy (val->s, (char *) vptr, MIN (length, var_width));
837 int16_t n_digits, weight, dscale;
840 GET_VALUE (&vptr, n_digits);
841 GET_VALUE (&vptr, weight);
842 GET_VALUE (&vptr, sign);
843 GET_VALUE (&vptr, dscale);
850 fmt.w = fmt_max_output_width (fmt.type) ;
851 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
852 var_set_both_formats (v, &fmt);
856 for (i = 0 ; i < n_digits; ++i)
859 GET_VALUE (&vptr, x);
860 f += x * pow (10000, weight--);