1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
25 #include "psql-reader.h"
33 #define _(msgid) gettext (msgid)
34 #define N_(msgid) (msgid)
39 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
41 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
52 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
65 #define BPCHAROID 1042
66 #define VARCHAROID 1043
69 #define TIMESTAMPOID 1114
70 #define TIMESTAMPTZOID 1184
71 #define INTERVALOID 1186
72 #define TIMETZOID 1266
73 #define NUMERICOID 1700
75 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
77 static bool psql_casereader_read (struct casereader *, void *,
80 static const struct casereader_class psql_casereader_class =
83 psql_casereader_destroy,
92 bool integer_datetimes;
94 double postgres_epoch;
97 struct dictionary *dict;
100 struct ccase first_case;
102 /* An array of ints, which maps psql column numbers into
103 pspp variable numbers */
109 static void set_value (const struct psql_reader *r,
110 PGresult *res, struct ccase *c);
116 data_to_native (const void *in_, void *out_, int len)
119 const unsigned char *in = in_;
120 unsigned char *out = out_;
121 for (i = 0 ; i < len ; ++i )
126 data_to_native (const void *in_, void *out_, int len)
129 const unsigned char *in = in_;
130 unsigned char *out = out_;
131 for (i = 0 ; i < len ; ++i )
132 out[len - i - 1] = in[i];
137 #define GET_VALUE(IN, OUT) do { \
138 size_t sz = sizeof (OUT); \
139 data_to_native (*(IN), &(OUT), sz) ; \
146 dump (const unsigned char *x, int l)
150 for (i = 0; i < l ; ++i)
152 printf ("%02x ", x[i]);
157 for (i = 0; i < l ; ++i)
160 printf ("%c ", x[i]);
169 static struct variable *
170 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
171 int width, const char *suggested_name, int col)
173 unsigned long int vx = 0;
175 struct variable *var;
176 char name[VAR_NAME_LEN + 1];
178 r->value_cnt += value_cnt_from_width (width);
180 if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
182 msg (ME, _("Cannot create variable name from %s"), suggested_name);
186 var = dict_create_var (r->dict, name, width);
187 var_set_both_formats (var, fmt);
189 vidx = var_get_dict_index (var);
193 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (int));
196 r->vmapsize = col + 1;
203 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
207 PGresult *res = NULL;
209 struct psql_reader *r = xzalloc (sizeof *r);
210 struct string query ;
213 r->conn = PQconnectdb (info->conninfo);
214 if ( NULL == r->conn)
216 msg (ME, _("Memory error whilst opening psql source"));
220 if ( PQstatus (r->conn) != CONNECTION_OK )
222 msg (ME, _("Error opening psql source: %s."),
223 PQerrorMessage (r->conn));
230 const char *vers = PQparameterStatus (r->conn, "server_version");
232 sscanf (vers, "%d", &v1);
237 _("Postgres server is version %s."
238 " Reading from versions earlier than 8.0 is not supported."),
246 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
248 r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
252 if ( PQgetssl (r->conn) == NULL)
255 if (! info->allow_clear)
257 msg (ME, _("Connection is unencrypted, "
258 "but unencrypted connections have not been permitted."));
264 calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
267 /* Create the dictionary and populate it */
268 *dict = r->dict = dict_create ();
270 ds_init_cstr (&query, "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; DECLARE pspp BINARY CURSOR FOR ");
271 ds_put_substring (&query, info->sql.ss);
273 res = PQexec (r->conn, ds_cstr (&query));
275 if ( PQresultStatus (res) != PGRES_COMMAND_OK )
277 msg (ME, _("Error from psql source: %s."),
278 PQresultErrorMessage (res));
284 res = PQexec (r->conn, "FETCH FIRST FROM pspp");
285 if ( PQresultStatus (res) != PGRES_TUPLES_OK )
287 msg (ME, _("Error from psql source: %s."),
288 PQresultErrorMessage (res));
292 n_fields = PQnfields (res);
298 for (i = 0 ; i < n_fields ; ++i )
300 struct variable *var;
301 struct fmt_spec fmt = {FMT_F, 8, 2};
302 Oid type = PQftype (res, i);
304 int length = PQgetlength (res, 0, i);
318 fmt.type = FMT_DOLLAR;
322 width = length > 0 ? length : 1;
330 width = (info->str_width == -1) ?
331 ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
337 width = length > 0 ? length : MAX_SHORT_STRING;
342 fmt.type = FMT_DTIME;
362 fmt.type = FMT_DATETIME;
374 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
376 width = length > 0 ? length : MAX_SHORT_STRING;
383 var = create_var (r, &fmt, width, PQfname (res, i), i);
385 /* Timezones need an extra variable */
391 ds_init_cstr (&name, var_get_name (var));
392 ds_put_cstr (&name, "-zone");
397 create_var (r, &fmt, 0, ds_cstr (&name), -1);
406 ds_init_cstr (&name, var_get_name (var));
407 ds_put_cstr (&name, "-months");
412 create_var (r, &fmt, 0, ds_cstr (&name), -1);
422 /* Create the first case, and cache it */
423 r->used_first_case = false;
426 case_create (&r->first_case, r->value_cnt);
427 memset (case_data_rw_idx (&r->first_case, 0)->s,
428 ' ', MAX_SHORT_STRING * r->value_cnt);
430 set_value (r, res, &r->first_case);
434 return casereader_create_sequential
438 &psql_casereader_class, r);
442 dict_destroy (*dict);
444 psql_casereader_destroy (NULL, r);
450 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
452 struct psql_reader *r = r_;
463 psql_casereader_read (struct casereader *reader UNUSED, void *r_,
468 struct psql_reader *r = r_;
470 if ( !r->used_first_case )
473 r->used_first_case = true;
477 case_create (cc, r->value_cnt);
478 memset (case_data_rw_idx (cc, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
480 res = PQexec (r->conn, "FETCH NEXT FROM pspp");
481 if ( PQresultStatus (res) != PGRES_TUPLES_OK || PQntuples (res) < 1)
488 set_value (r, res, cc);
496 set_value (const struct psql_reader *r,
497 PGresult *res, struct ccase *c)
500 int n_vars = PQnfields (res);
502 for (i = 0 ; i < n_vars ; ++i )
504 Oid type = PQftype (res, i);
505 struct variable *v = dict_get_var (r->dict, r->vmap[i]);
506 union value *val = case_data_rw (c, v);
507 const struct variable *v1 = NULL;
508 union value *val1 = NULL;
510 if (i < r->vmapsize && r->vmap[i] + 1 < dict_get_var_cnt (r->dict))
512 v1 = dict_get_var (r->dict, r->vmap[i] + 1);
514 val1 = case_data_rw (c, v1);
518 if (PQgetisnull (res, 0, i))
520 value_set_missing (val, var_get_width (v));
535 const uint8_t *vptr = (const uint8_t *) PQgetvalue (res, 0, i);
536 int length = PQgetlength (res, 0, i);
538 int var_width = var_get_width (v);
544 GET_VALUE (&vptr, x);
553 GET_VALUE (&vptr, x);
561 GET_VALUE (&vptr, x);
569 GET_VALUE (&vptr, x);
577 GET_VALUE (&vptr, n);
585 GET_VALUE (&vptr, n);
593 GET_VALUE (&vptr, x);
600 if ( r->integer_datetimes )
607 GET_VALUE (&vptr, things);
608 GET_VALUE (&vptr, us);
609 GET_VALUE (&vptr, days);
610 GET_VALUE (&vptr, months);
612 val->f = us / 1000000.0;
613 val->f += days * 24 * 3600;
619 uint32_t days, months;
622 GET_VALUE (&vptr, seconds);
623 GET_VALUE (&vptr, days);
624 GET_VALUE (&vptr, months);
627 val->f += days * 24 * 3600;
638 GET_VALUE (&vptr, x);
640 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
646 if ( r->integer_datetimes)
649 GET_VALUE (&vptr, x);
650 val->f = x / 1000000.0;
655 GET_VALUE (&vptr, x);
664 if ( r->integer_datetimes)
669 GET_VALUE (&vptr, x);
670 val->f = x / 1000000.0;
676 GET_VALUE (&vptr, x);
680 GET_VALUE (&vptr, zone);
681 val1->f = zone / 3600.0;
688 if ( r->integer_datetimes)
692 GET_VALUE (&vptr, x);
696 val->f = (x + r->postgres_epoch * 24 * 3600 );
702 GET_VALUE (&vptr, x);
704 val->f = (x + r->postgres_epoch * 24 * 3600 );
712 memcpy (val->s, (char *) vptr, MIN (length, var_width));
719 int16_t n_digits, weight, dscale;
722 GET_VALUE (&vptr, n_digits);
723 GET_VALUE (&vptr, weight);
724 GET_VALUE (&vptr, sign);
725 GET_VALUE (&vptr, dscale);
731 fmt.w = fmt_max_output_width (fmt.type) ;
732 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
733 var_set_both_formats (v, &fmt);
736 for (i = 0 ; i < n_digits; ++i)
739 GET_VALUE (&vptr, x);
740 f += x * pow (10000, weight--);