1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
25 #include "psql-reader.h"
31 #include <libpspp/str.h>
34 #define _(msgid) gettext (msgid)
35 #define N_(msgid) (msgid)
40 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
42 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
53 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
66 #define BPCHAROID 1042
67 #define VARCHAROID 1043
70 #define TIMESTAMPOID 1114
71 #define TIMESTAMPTZOID 1184
72 #define INTERVALOID 1186
73 #define TIMETZOID 1266
74 #define NUMERICOID 1700
76 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
78 static struct ccase *psql_casereader_read (struct casereader *, void *);
80 static const struct casereader_class psql_casereader_class =
83 psql_casereader_destroy,
94 bool integer_datetimes;
96 double postgres_epoch;
99 struct dictionary *dict;
101 /* An array of ints, which maps psql column numbers into
103 struct variable **vmap;
106 struct string fetch_cmd;
111 static struct ccase *set_value (struct psql_reader *r);
117 data_to_native (const void *in_, void *out_, int len)
120 const unsigned char *in = in_;
121 unsigned char *out = out_;
122 for (i = 0 ; i < len ; ++i )
127 data_to_native (const void *in_, void *out_, int len)
130 const unsigned char *in = in_;
131 unsigned char *out = out_;
132 for (i = 0 ; i < len ; ++i )
133 out[len - i - 1] = in[i];
138 #define GET_VALUE(IN, OUT) do { \
139 size_t sz = sizeof (OUT); \
140 data_to_native (*(IN), &(OUT), sz) ; \
147 dump (const unsigned char *x, int l)
151 for (i = 0; i < l ; ++i)
153 printf ("%02x ", x[i]);
158 for (i = 0; i < l ; ++i)
161 printf ("%c ", x[i]);
170 static struct variable *
171 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
172 int width, const char *suggested_name, int col)
174 unsigned long int vx = 0;
175 struct variable *var;
176 char name[VAR_NAME_LEN + 1];
178 r->value_cnt += value_cnt_from_width (width);
180 if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
182 msg (ME, _("Cannot create variable name from %s"), suggested_name);
186 var = dict_create_var (r->dict, name, width);
187 var_set_both_formats (var, fmt);
191 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
194 r->vmapsize = col + 1;
205 reload_cache (struct psql_reader *r)
210 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
212 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
224 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
227 int n_fields, n_tuples;
228 PGresult *qres = NULL;
229 casenumber n_cases = CASENUMBER_MAX;
231 struct psql_reader *r = xzalloc (sizeof *r);
232 struct string query ;
234 r->conn = PQconnectdb (info->conninfo);
235 if ( NULL == r->conn)
237 msg (ME, _("Memory error whilst opening psql source"));
241 if ( PQstatus (r->conn) != CONNECTION_OK )
243 msg (ME, _("Error opening psql source: %s."),
244 PQerrorMessage (r->conn));
251 const char *vers = PQparameterStatus (r->conn, "server_version");
253 sscanf (vers, "%d", &ver_num);
258 _("Postgres server is version %s."
259 " Reading from versions earlier than 8.0 is not supported."),
267 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
269 r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
273 if ( PQgetssl (r->conn) == NULL)
276 if (! info->allow_clear)
278 msg (ME, _("Connection is unencrypted, "
279 "but unencrypted connections have not been permitted."));
285 calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
288 /* Create the dictionary and populate it */
289 *dict = r->dict = dict_create ();
292 const int enc = PQclientEncoding (r->conn);
294 /* According to section 22.2 of the Postgresql manual
295 a value of zero (SQL_ASCII) indicates
296 "a declaration of ignorance about the encoding".
297 Accordingly, we don't set the dictionary's encoding
298 if we find this value.
301 dict_set_encoding (r->dict, pg_encoding_to_char (enc));
305 select count (*) from (select * from medium) stupid_sql_standard;
307 ds_init_cstr (&query,
308 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
309 "DECLARE pspp BINARY CURSOR FOR ");
311 ds_put_substring (&query, info->sql.ss);
313 qres = PQexec (r->conn, ds_cstr (&query));
315 if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
317 msg (ME, _("Error from psql source: %s."),
318 PQresultErrorMessage (qres));
325 /* Now use the count() function to find the total number of cases
326 that this query returns.
327 Doing this incurs some overhead. The server has to iterate every
328 case in order to find this number. However, it's performed on the
329 server side, and in all except the most huge databases the extra
330 overhead will be worth the effort.
331 On the other hand, most PSPP functions don't need to know this.
332 The GUI is the notable exception.
334 ds_init_cstr (&query, "SELECT count (*) FROM (");
335 ds_put_substring (&query, info->sql.ss);
336 ds_put_cstr (&query, ") stupid_sql_standard");
338 qres = PQexec (r->conn, ds_cstr (&query));
340 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
342 msg (ME, _("Error from psql source: %s."),
343 PQresultErrorMessage (qres));
346 n_cases = atol (PQgetvalue (qres, 0, 0));
349 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
350 if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
352 msg (ME, _("Error from psql source: %s."),
353 PQresultErrorMessage (qres));
357 n_tuples = PQntuples (qres);
358 n_fields = PQnfields (qres);
364 for (i = 0 ; i < n_fields ; ++i )
366 struct variable *var;
367 struct fmt_spec fmt = {FMT_F, 8, 2};
368 Oid type = PQftype (qres, i);
372 /* If there are no data then make a finger in the air
373 guess at the contents */
375 length = PQgetlength (qres, 0, i);
377 length = MAX_SHORT_STRING;
391 fmt.type = FMT_DOLLAR;
395 width = length > 0 ? length : 1;
403 width = (info->str_width == -1) ?
404 ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
410 width = length > 0 ? length : MAX_SHORT_STRING;
415 fmt.type = FMT_DTIME;
435 fmt.type = FMT_DATETIME;
447 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
449 width = length > 0 ? length : MAX_SHORT_STRING;
455 if ( width == 0 && fmt_is_string (fmt.type))
456 fmt.w = width = MAX_SHORT_STRING;
459 var = create_var (r, &fmt, width, PQfname (qres, i), i);
460 if ( type == NUMERICOID && n_tuples > 0)
462 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
464 int16_t n_digits, weight, dscale;
467 GET_VALUE (&vptr, n_digits);
468 GET_VALUE (&vptr, weight);
469 GET_VALUE (&vptr, sign);
470 GET_VALUE (&vptr, dscale);
474 fmt.w = fmt_max_output_width (fmt.type) ;
475 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
476 var_set_both_formats (var, &fmt);
479 /* Timezones need an extra variable */
485 ds_init_cstr (&name, var_get_name (var));
486 ds_put_cstr (&name, "-zone");
491 create_var (r, &fmt, 0, ds_cstr (&name), -1);
500 ds_init_cstr (&name, var_get_name (var));
501 ds_put_cstr (&name, "-months");
506 create_var (r, &fmt, 0, ds_cstr (&name), -1);
517 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
518 if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
525 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
527 ds_init_empty (&r->fetch_cmd);
528 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
532 return casereader_create_sequential
536 &psql_casereader_class, r);
539 dict_destroy (*dict);
541 psql_casereader_destroy (NULL, r);
547 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
549 struct psql_reader *r = r_;
553 ds_destroy (&r->fetch_cmd);
555 if (r->res) PQclear (r->res);
563 static struct ccase *
564 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
566 struct psql_reader *r = r_;
568 if ( NULL == r->res || r->tuple >= r->cache_size)
570 if ( ! reload_cache (r) )
574 return set_value (r);
577 static struct ccase *
578 set_value (struct psql_reader *r)
586 n_vars = PQnfields (r->res);
588 if ( r->tuple >= PQntuples (r->res))
591 c = case_create (r->value_cnt);
592 memset (case_data_rw_idx (c, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
595 for (i = 0 ; i < n_vars ; ++i )
597 Oid type = PQftype (r->res, i);
598 const struct variable *v = r->vmap[i];
599 union value *val = case_data_rw (c, v);
601 union value *val1 = NULL;
608 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
610 const struct variable *v1 = NULL;
611 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
613 val1 = case_data_rw (c, v1);
621 if (PQgetisnull (r->res, r->tuple, i))
623 value_set_missing (val, var_get_width (v));
638 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
639 int length = PQgetlength (r->res, r->tuple, i);
641 int var_width = var_get_width (v);
647 GET_VALUE (&vptr, x);
656 GET_VALUE (&vptr, x);
664 GET_VALUE (&vptr, x);
672 GET_VALUE (&vptr, x);
680 GET_VALUE (&vptr, n);
688 GET_VALUE (&vptr, n);
695 /* Postgres 8.3 uses 64 bits.
696 Earlier versions use 32 */
702 GET_VALUE (&vptr, x);
709 GET_VALUE (&vptr, x);
722 if ( r->integer_datetimes )
729 GET_VALUE (&vptr, things);
730 GET_VALUE (&vptr, us);
731 GET_VALUE (&vptr, days);
732 GET_VALUE (&vptr, months);
734 val->f = us / 1000000.0;
735 val->f += days * 24 * 3600;
741 uint32_t days, months;
744 GET_VALUE (&vptr, seconds);
745 GET_VALUE (&vptr, days);
746 GET_VALUE (&vptr, months);
749 val->f += days * 24 * 3600;
760 GET_VALUE (&vptr, x);
762 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
768 if ( r->integer_datetimes)
771 GET_VALUE (&vptr, x);
772 val->f = x / 1000000.0;
777 GET_VALUE (&vptr, x);
786 if ( r->integer_datetimes)
791 GET_VALUE (&vptr, x);
792 val->f = x / 1000000.0;
798 GET_VALUE (&vptr, x);
802 GET_VALUE (&vptr, zone);
803 val1->f = zone / 3600.0;
810 if ( r->integer_datetimes)
814 GET_VALUE (&vptr, x);
818 val->f = (x + r->postgres_epoch * 24 * 3600 );
824 GET_VALUE (&vptr, x);
826 val->f = (x + r->postgres_epoch * 24 * 3600 );
834 memcpy (val->s, (char *) vptr, MIN (length, var_width));
841 int16_t n_digits, weight, dscale;
844 GET_VALUE (&vptr, n_digits);
845 GET_VALUE (&vptr, weight);
846 GET_VALUE (&vptr, sign);
847 GET_VALUE (&vptr, dscale);
854 fmt.w = fmt_max_output_width (fmt.type) ;
855 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
856 var_set_both_formats (v, &fmt);
860 for (i = 0 ; i < n_digits; ++i)
863 GET_VALUE (&vptr, x);
864 f += x * pow (10000, weight--);