1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
25 #include "psql-reader.h"
31 #include <libpspp/str.h>
34 #define _(msgid) gettext (msgid)
35 #define N_(msgid) (msgid)
40 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
42 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
53 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
66 #define BPCHAROID 1042
67 #define VARCHAROID 1043
70 #define TIMESTAMPOID 1114
71 #define TIMESTAMPTZOID 1184
72 #define INTERVALOID 1186
73 #define TIMETZOID 1266
74 #define NUMERICOID 1700
76 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
78 static struct ccase *psql_casereader_read (struct casereader *, void *);
80 static const struct casereader_class psql_casereader_class =
83 psql_casereader_destroy,
94 bool integer_datetimes;
96 double postgres_epoch;
99 struct dictionary *dict;
101 /* An array of ints, which maps psql column numbers into
103 struct variable **vmap;
106 struct string fetch_cmd;
111 static struct ccase *set_value (struct psql_reader *r);
117 data_to_native (const void *in_, void *out_, int len)
120 const unsigned char *in = in_;
121 unsigned char *out = out_;
122 for (i = 0 ; i < len ; ++i )
127 data_to_native (const void *in_, void *out_, int len)
130 const unsigned char *in = in_;
131 unsigned char *out = out_;
132 for (i = 0 ; i < len ; ++i )
133 out[len - i - 1] = in[i];
138 #define GET_VALUE(IN, OUT) do { \
139 size_t sz = sizeof (OUT); \
140 data_to_native (*(IN), &(OUT), sz) ; \
147 dump (const unsigned char *x, int l)
151 for (i = 0; i < l ; ++i)
153 printf ("%02x ", x[i]);
158 for (i = 0; i < l ; ++i)
161 printf ("%c ", x[i]);
170 static struct variable *
171 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
172 int width, const char *suggested_name, int col)
174 unsigned long int vx = 0;
175 struct variable *var;
176 char name[VAR_NAME_LEN + 1];
178 r->value_cnt += value_cnt_from_width (width);
180 if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
182 msg (ME, _("Cannot create variable name from %s"), suggested_name);
186 var = dict_create_var (r->dict, name, width);
187 var_set_both_formats (var, fmt);
191 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
194 r->vmapsize = col + 1;
205 reload_cache (struct psql_reader *r)
210 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
212 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
224 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
227 int n_fields, n_tuples;
228 PGresult *qres = NULL;
229 casenumber n_cases = CASENUMBER_MAX;
231 struct psql_reader *r = xzalloc (sizeof *r);
232 struct string query ;
234 r->conn = PQconnectdb (info->conninfo);
235 if ( NULL == r->conn)
237 msg (ME, _("Memory error whilst opening psql source"));
241 if ( PQstatus (r->conn) != CONNECTION_OK )
243 msg (ME, _("Error opening psql source: %s."),
244 PQerrorMessage (r->conn));
251 const char *vers = PQparameterStatus (r->conn, "server_version");
253 sscanf (vers, "%d", &ver_num);
258 _("Postgres server is version %s."
259 " Reading from versions earlier than 8.0 is not supported."),
267 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
269 r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
273 if ( PQgetssl (r->conn) == NULL)
276 if (! info->allow_clear)
278 msg (ME, _("Connection is unencrypted, "
279 "but unencrypted connections have not been permitted."));
285 calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
288 /* Create the dictionary and populate it */
289 *dict = r->dict = dict_create ();
292 select count (*) from (select * from medium) stupid_sql_standard;
295 ds_init_cstr (&query,
296 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
297 "DECLARE pspp BINARY CURSOR FOR ");
299 ds_put_substring (&query, info->sql.ss);
301 qres = PQexec (r->conn, ds_cstr (&query));
303 if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
305 msg (ME, _("Error from psql source: %s."),
306 PQresultErrorMessage (qres));
313 /* Now use the count() function to find the total number of cases
314 that this query returns.
315 Doing this incurs some overhead. The server has to iterate every
316 case in order to find this number. However, it's performed on the
317 server side, and in all except the most huge databases the extra
318 overhead will be worth the effort.
319 On the other hand, most PSPP functions don't need to know this.
320 The GUI is the notable exception.
322 ds_init_cstr (&query, "SELECT count (*) FROM (");
323 ds_put_substring (&query, info->sql.ss);
324 ds_put_cstr (&query, ") stupid_sql_standard");
326 qres = PQexec (r->conn, ds_cstr (&query));
328 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
330 msg (ME, _("Error from psql source: %s."),
331 PQresultErrorMessage (qres));
334 n_cases = atol (PQgetvalue (qres, 0, 0));
337 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
338 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
340 msg (ME, _("Error from psql source: %s."),
341 PQresultErrorMessage (qres));
345 n_tuples = PQntuples (qres);
346 n_fields = PQnfields (qres);
352 for (i = 0 ; i < n_fields ; ++i )
354 struct variable *var;
355 struct fmt_spec fmt = {FMT_F, 8, 2};
356 Oid type = PQftype (qres, i);
360 /* If there are no data then make a finger in the air
361 guess at the contents */
363 length = PQgetlength (qres, 0, i);
365 length = MAX_SHORT_STRING;
379 fmt.type = FMT_DOLLAR;
383 width = length > 0 ? length : 1;
391 width = (info->str_width == -1) ?
392 ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
398 width = length > 0 ? length : MAX_SHORT_STRING;
403 fmt.type = FMT_DTIME;
423 fmt.type = FMT_DATETIME;
435 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
437 width = length > 0 ? length : MAX_SHORT_STRING;
443 var = create_var (r, &fmt, width, PQfname (qres, i), i);
444 if ( type == NUMERICOID && n_tuples > 0)
446 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
448 int16_t n_digits, weight, dscale;
451 GET_VALUE (&vptr, n_digits);
452 GET_VALUE (&vptr, weight);
453 GET_VALUE (&vptr, sign);
454 GET_VALUE (&vptr, dscale);
458 fmt.w = fmt_max_output_width (fmt.type) ;
459 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
460 var_set_both_formats (var, &fmt);
463 /* Timezones need an extra variable */
469 ds_init_cstr (&name, var_get_name (var));
470 ds_put_cstr (&name, "-zone");
475 create_var (r, &fmt, 0, ds_cstr (&name), -1);
484 ds_init_cstr (&name, var_get_name (var));
485 ds_put_cstr (&name, "-months");
490 create_var (r, &fmt, 0, ds_cstr (&name), -1);
501 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
502 if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
509 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
511 ds_init_empty (&r->fetch_cmd);
512 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
516 return casereader_create_sequential
520 &psql_casereader_class, r);
523 dict_destroy (*dict);
525 psql_casereader_destroy (NULL, r);
531 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
533 struct psql_reader *r = r_;
537 ds_destroy (&r->fetch_cmd);
539 if (r->res) PQclear (r->res);
547 static struct ccase *
548 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
550 struct psql_reader *r = r_;
552 if ( NULL == r->res || r->tuple >= r->cache_size)
554 if ( ! reload_cache (r) )
558 return set_value (r);
561 static struct ccase *
562 set_value (struct psql_reader *r)
570 n_vars = PQnfields (r->res);
572 if ( r->tuple >= PQntuples (r->res))
575 c = case_create (r->value_cnt);
576 memset (case_data_rw_idx (c, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
579 for (i = 0 ; i < n_vars ; ++i )
581 Oid type = PQftype (r->res, i);
582 const struct variable *v = r->vmap[i];
583 union value *val = case_data_rw (c, v);
585 union value *val1 = NULL;
592 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
594 const struct variable *v1 = NULL;
595 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
597 val1 = case_data_rw (c, v1);
605 if (PQgetisnull (r->res, r->tuple, i))
607 value_set_missing (val, var_get_width (v));
622 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
623 int length = PQgetlength (r->res, r->tuple, i);
625 int var_width = var_get_width (v);
631 GET_VALUE (&vptr, x);
640 GET_VALUE (&vptr, x);
648 GET_VALUE (&vptr, x);
656 GET_VALUE (&vptr, x);
664 GET_VALUE (&vptr, n);
672 GET_VALUE (&vptr, n);
679 /* Postgres 8.3 uses 64 bits.
680 Earlier versions use 32 */
686 GET_VALUE (&vptr, x);
693 GET_VALUE (&vptr, x);
706 if ( r->integer_datetimes )
713 GET_VALUE (&vptr, things);
714 GET_VALUE (&vptr, us);
715 GET_VALUE (&vptr, days);
716 GET_VALUE (&vptr, months);
718 val->f = us / 1000000.0;
719 val->f += days * 24 * 3600;
725 uint32_t days, months;
728 GET_VALUE (&vptr, seconds);
729 GET_VALUE (&vptr, days);
730 GET_VALUE (&vptr, months);
733 val->f += days * 24 * 3600;
744 GET_VALUE (&vptr, x);
746 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
752 if ( r->integer_datetimes)
755 GET_VALUE (&vptr, x);
756 val->f = x / 1000000.0;
761 GET_VALUE (&vptr, x);
770 if ( r->integer_datetimes)
775 GET_VALUE (&vptr, x);
776 val->f = x / 1000000.0;
782 GET_VALUE (&vptr, x);
786 GET_VALUE (&vptr, zone);
787 val1->f = zone / 3600.0;
794 if ( r->integer_datetimes)
798 GET_VALUE (&vptr, x);
802 val->f = (x + r->postgres_epoch * 24 * 3600 );
808 GET_VALUE (&vptr, x);
810 val->f = (x + r->postgres_epoch * 24 * 3600 );
818 memcpy (val->s, (char *) vptr, MIN (length, var_width));
825 int16_t n_digits, weight, dscale;
828 GET_VALUE (&vptr, n_digits);
829 GET_VALUE (&vptr, weight);
830 GET_VALUE (&vptr, sign);
831 GET_VALUE (&vptr, dscale);
838 fmt.w = fmt_max_output_width (fmt.type) ;
839 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
840 var_set_both_formats (v, &fmt);
844 for (i = 0 ; i < n_digits; ++i)
847 GET_VALUE (&vptr, x);
848 f += x * pow (10000, weight--);