calendar: Use sensible error reporting in calendar_gregorian_to_offset().
[pspp] / src / data / psql-reader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2008, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
23 #include <math.h>
24 #include <stdlib.h>
25
26 #include "psql-reader.h"
27 #include "variable.h"
28 #include "format.h"
29 #include "calendar.h"
30
31 #include <inttypes.h>
32 #include <libpspp/misc.h>
33 #include <libpspp/str.h>
34
35 #include "minmax.h"
36
37 #include "gettext.h"
38 #define _(msgid) gettext (msgid)
39 #define N_(msgid) (msgid)
40
41
42 #if !PSQL_SUPPORT
43 struct casereader *
44 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
45 {
46   msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
47
48   return NULL;
49 }
50
51 #else
52
53 #include <stdint.h>
54 #include <libpq-fe.h>
55
56
57 /* Default width of string variables. */
58 #define PSQL_DEFAULT_WIDTH 8
59
60 /* These macros  must be the same as in catalog/pg_types.h from the postgres source */
61 #define BOOLOID            16
62 #define BYTEAOID           17
63 #define CHAROID            18
64 #define NAMEOID            19
65 #define INT8OID            20
66 #define INT2OID            21
67 #define INT4OID            23
68 #define TEXTOID            25
69 #define OIDOID             26
70 #define FLOAT4OID          700
71 #define FLOAT8OID          701
72 #define CASHOID            790
73 #define BPCHAROID          1042
74 #define VARCHAROID         1043
75 #define DATEOID            1082
76 #define TIMEOID            1083
77 #define TIMESTAMPOID       1114
78 #define TIMESTAMPTZOID     1184
79 #define INTERVALOID        1186
80 #define TIMETZOID          1266
81 #define NUMERICOID         1700
82
83 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
84
85 static struct ccase *psql_casereader_read (struct casereader *, void *);
86
87 static const struct casereader_class psql_casereader_class =
88   {
89     psql_casereader_read,
90     psql_casereader_destroy,
91     NULL,
92     NULL,
93   };
94
95 struct psql_reader
96 {
97   PGconn *conn;
98   PGresult *res;
99   int tuple;
100
101   bool integer_datetimes;
102
103   double postgres_epoch;
104
105   struct caseproto *proto;
106   struct dictionary *dict;
107
108   /* An array of ints, which maps psql column numbers into
109      pspp variables */
110   struct variable **vmap;
111   size_t vmapsize;
112
113   struct string fetch_cmd;
114   int cache_size;
115 };
116
117
118 static struct ccase *set_value (struct psql_reader *r);
119
120
121
122 #if WORDS_BIGENDIAN
123 static void
124 data_to_native (const void *in_, void *out_, int len)
125 {
126   int i;
127   const unsigned char *in = in_;
128   unsigned char *out = out_;
129   for (i = 0 ; i < len ; ++i )
130     out[i] = in[i];
131 }
132 #else
133 static void
134 data_to_native (const void *in_, void *out_, int len)
135 {
136   int i;
137   const unsigned char *in = in_;
138   unsigned char *out = out_;
139   for (i = 0 ; i < len ; ++i )
140     out[len - i - 1] = in[i];
141 }
142 #endif
143
144
145 #define GET_VALUE(IN, OUT) do { \
146     size_t sz = sizeof (OUT); \
147     data_to_native (*(IN), &(OUT), sz) ; \
148     (*IN) += sz; \
149 } while (false)
150
151
152 #if 0
153 static void
154 dump (const unsigned char *x, int l)
155 {
156   int i;
157
158   for (i = 0; i < l ; ++i)
159     {
160       printf ("%02x ", x[i]);
161     }
162
163   putchar ('\n');
164
165   for (i = 0; i < l ; ++i)
166     {
167       if ( isprint (x[i]))
168         printf ("%c ", x[i]);
169       else
170         printf ("   ");
171     }
172
173   putchar ('\n');
174 }
175 #endif
176
177 static struct variable *
178 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
179             int width, const char *suggested_name, int col)
180 {
181   unsigned long int vx = 0;
182   struct variable *var;
183   char name[VAR_NAME_LEN + 1];
184
185   if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
186     {
187       msg (ME, _("Cannot create variable name from %s"), suggested_name);
188       return NULL;
189     }
190
191   var = dict_create_var (r->dict, name, width);
192   var_set_both_formats (var, fmt);
193
194   if ( col != -1)
195     {
196       r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
197
198       r->vmap[col] = var;
199       r->vmapsize = col + 1;
200     }
201
202   return var;
203 }
204
205
206
207
208 /* Fill the cache */
209 static bool
210 reload_cache (struct psql_reader *r)
211 {
212   PQclear (r->res);
213   r->tuple = 0;
214
215   r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
216
217   if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
218     {
219       PQclear (r->res);
220       r->res = NULL;
221       return false;
222     }
223
224   return true;
225 }
226
227
228 struct casereader *
229 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
230 {
231   int i;
232   int n_fields, n_tuples;
233   PGresult *qres = NULL;
234   casenumber n_cases = CASENUMBER_MAX;
235
236   struct psql_reader *r = xzalloc (sizeof *r);
237   struct string query ;
238
239   r->conn = PQconnectdb (info->conninfo);
240   if ( NULL == r->conn)
241     {
242       msg (ME, _("Memory error whilst opening psql source"));
243       goto error;
244     }
245
246   if ( PQstatus (r->conn) != CONNECTION_OK )
247     {
248       msg (ME, _("Error opening psql source: %s."),
249            PQerrorMessage (r->conn));
250
251       goto error;
252     }
253
254   {
255     int ver_num;
256     const char *vers = PQparameterStatus (r->conn, "server_version");
257
258     sscanf (vers, "%d", &ver_num);
259
260     if ( ver_num < 8)
261       {
262         msg (ME,
263              _("Postgres server is version %s."
264                " Reading from versions earlier than 8.0 is not supported."),
265              vers);
266
267         goto error;
268       }
269   }
270
271   {
272     const char *dt =  PQparameterStatus (r->conn, "integer_datetimes");
273
274     r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
275   }
276
277 #if USE_SSL
278   if ( PQgetssl (r->conn) == NULL)
279 #endif
280     {
281       if (! info->allow_clear)
282         {
283           msg (ME, _("Connection is unencrypted, "
284                      "but unencrypted connections have not been permitted."));
285           goto error;
286         }
287     }
288
289   r->postgres_epoch = calendar_gregorian_to_offset (2000, 1, 1, NULL);
290
291
292   /* Create the dictionary and populate it */
293   *dict = r->dict = dict_create ();
294
295   {
296     const int enc = PQclientEncoding (r->conn);
297
298     /* According to section 22.2 of the Postgresql manual
299        a value of zero (SQL_ASCII) indicates
300        "a declaration of ignorance about the encoding".
301        Accordingly, we don't set the dictionary's encoding
302        if we find this value.
303     */
304     if ( enc != 0 )
305       dict_set_encoding (r->dict, pg_encoding_to_char (enc));
306   }
307
308   /*
309     select count (*) from (select * from medium) stupid_sql_standard;
310   */
311   ds_init_cstr (&query,
312                 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
313                 "DECLARE  pspp BINARY CURSOR FOR ");
314
315   ds_put_substring (&query, info->sql.ss);
316
317   qres = PQexec (r->conn, ds_cstr (&query));
318   ds_destroy (&query);
319   if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
320     {
321       msg (ME, _("Error from psql source: %s."),
322            PQresultErrorMessage (qres));
323       goto error;
324     }
325
326   PQclear (qres);
327
328
329   /* Now use the count() function to find the total number of cases
330      that this query returns.
331      Doing this incurs some overhead.  The server has to iterate every
332      case in order to find this number.  However, it's performed on the
333      server side, and in all except the most huge databases the extra
334      overhead will be worth the effort.
335      On the other hand, most PSPP functions don't need to know this.
336      The GUI is the notable exception.
337   */
338   ds_init_cstr (&query, "SELECT count (*) FROM (");
339   ds_put_substring (&query, info->sql.ss);
340   ds_put_cstr (&query, ") stupid_sql_standard");
341
342   qres = PQexec (r->conn, ds_cstr (&query));
343   ds_destroy (&query);
344   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
345     {
346       msg (ME, _("Error from psql source: %s."),
347            PQresultErrorMessage (qres));
348       goto error;
349     }
350   n_cases = atol (PQgetvalue (qres, 0, 0));
351   PQclear (qres);
352
353   qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
354   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
355     {
356       msg (ME, _("Error from psql source: %s."),
357            PQresultErrorMessage (qres));
358       goto error;
359     }
360
361   n_tuples = PQntuples (qres);
362   n_fields = PQnfields (qres);
363
364   r->proto = NULL;
365   r->vmap = NULL;
366   r->vmapsize = 0;
367
368   for (i = 0 ; i < n_fields ; ++i )
369     {
370       struct variable *var;
371       struct fmt_spec fmt = {FMT_F, 8, 2};
372       Oid type = PQftype (qres, i);
373       int width = 0;
374       int length ;
375
376       /* If there are no data then make a finger in the air 
377          guess at the contents */
378       if ( n_tuples > 0 )
379         length = PQgetlength (qres, 0, i);
380       else 
381         length = PSQL_DEFAULT_WIDTH;
382
383       switch (type)
384         {
385         case BOOLOID:
386         case OIDOID:
387         case INT2OID:
388         case INT4OID:
389         case INT8OID:
390         case FLOAT4OID:
391         case FLOAT8OID:
392           fmt.type = FMT_F;
393           break;
394         case CASHOID:
395           fmt.type = FMT_DOLLAR;
396           break;
397         case CHAROID:
398           fmt.type = FMT_A;
399           width = length > 0 ? length : 1;
400           fmt.d = 0;
401           fmt.w = 1;
402           break;
403         case TEXTOID:
404         case VARCHAROID:
405         case BPCHAROID:
406           fmt.type = FMT_A;
407           width = (info->str_width == -1) ?
408             ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
409           fmt.w = width;
410           fmt.d = 0;
411           break;
412         case BYTEAOID:
413           fmt.type = FMT_AHEX;
414           width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
415           fmt.w = width * 2;
416           fmt.d = 0;
417           break;
418         case INTERVALOID:
419           fmt.type = FMT_DTIME;
420           width = 0;
421           fmt.d = 0;
422           fmt.w = 13;
423           break;
424         case DATEOID:
425           fmt.type = FMT_DATE;
426           width = 0;
427           fmt.w = 11;
428           fmt.d = 0;
429           break;
430         case TIMEOID:
431         case TIMETZOID:
432           fmt.type = FMT_TIME;
433           width = 0;
434           fmt.w = 11;
435           fmt.d = 0;
436           break;
437         case TIMESTAMPOID:
438         case TIMESTAMPTZOID:
439           fmt.type = FMT_DATETIME;
440           fmt.d = 0;
441           fmt.w = 22;
442           width = 0;
443           break;
444         case NUMERICOID:
445           fmt.type = FMT_E;
446           fmt.d = 2;
447           fmt.w = 40;
448           width = 0;
449           break;
450         default:
451           msg (MW, _("Unsupported OID %d.  SYSMIS values will be inserted."), type);
452           fmt.type = FMT_A;
453           width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
454           fmt.w = width ;
455           fmt.d = 0;
456           break;
457         }
458
459       if ( width == 0 && fmt_is_string (fmt.type))
460         fmt.w = width = PSQL_DEFAULT_WIDTH;
461
462
463       var = create_var (r, &fmt, width, PQfname (qres, i), i);
464       if ( type == NUMERICOID && n_tuples > 0)
465         {
466           const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
467           struct fmt_spec fmt;
468           int16_t n_digits, weight, dscale;
469           uint16_t sign;
470
471           GET_VALUE (&vptr, n_digits);
472           GET_VALUE (&vptr, weight);
473           GET_VALUE (&vptr, sign);
474           GET_VALUE (&vptr, dscale);
475
476           fmt.d = dscale;
477           fmt.type = FMT_E;
478           fmt.w = fmt_max_output_width (fmt.type) ;
479           fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
480           var_set_both_formats (var, &fmt);
481         }
482
483       /* Timezones need an extra variable */
484       switch (type)
485         {
486         case TIMETZOID:
487           {
488             struct string name;
489             ds_init_cstr (&name, var_get_name (var));
490             ds_put_cstr (&name, "-zone");
491             fmt.type = FMT_F;
492             fmt.w = 8;
493             fmt.d = 2;
494
495             create_var (r, &fmt, 0, ds_cstr (&name), -1);
496
497             ds_destroy (&name);
498           }
499           break;
500
501         case INTERVALOID:
502           {
503             struct string name;
504             ds_init_cstr (&name, var_get_name (var));
505             ds_put_cstr (&name, "-months");
506             fmt.type = FMT_F;
507             fmt.w = 3;
508             fmt.d = 0;
509
510             create_var (r, &fmt, 0, ds_cstr (&name), -1);
511
512             ds_destroy (&name);
513           }
514         default:
515           break;
516         }
517     }
518
519   PQclear (qres);
520
521   qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
522   if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
523     {
524       PQclear (qres);
525       goto error;
526     }
527   PQclear (qres);
528
529   r->cache_size = info->bsize != -1 ? info->bsize: 4096;
530
531   ds_init_empty (&r->fetch_cmd);
532   ds_put_format (&r->fetch_cmd,  "FETCH FORWARD %d FROM pspp", r->cache_size);
533
534   reload_cache (r);
535   r->proto = caseproto_ref (dict_get_proto (*dict));
536
537   return casereader_create_sequential
538     (NULL,
539      r->proto,
540      n_cases,
541      &psql_casereader_class, r);
542
543  error:
544   dict_destroy (*dict);
545
546   psql_casereader_destroy (NULL, r);
547   return NULL;
548 }
549
550
551 static void
552 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
553 {
554   struct psql_reader *r = r_;
555   if (r == NULL)
556     return ;
557
558   ds_destroy (&r->fetch_cmd);
559   free (r->vmap);
560   if (r->res) PQclear (r->res);
561   PQfinish (r->conn);
562   caseproto_unref (r->proto);
563
564   free (r);
565 }
566
567
568
569 static struct ccase *
570 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
571 {
572   struct psql_reader *r = r_;
573
574   if ( NULL == r->res || r->tuple >= r->cache_size)
575     {
576       if ( ! reload_cache (r) )
577         return false;
578     }
579
580   return set_value (r);
581 }
582
583 static struct ccase *
584 set_value (struct psql_reader *r)
585 {
586   struct ccase *c;
587   int n_vars;
588   int i;
589
590   assert (r->res);
591
592   n_vars = PQnfields (r->res);
593
594   if ( r->tuple >= PQntuples (r->res))
595     return NULL;
596
597   c = case_create (r->proto);
598   case_set_missing (c);
599
600
601   for (i = 0 ; i < n_vars ; ++i )
602     {
603       Oid type = PQftype (r->res, i);
604       const struct variable *v = r->vmap[i];
605       union value *val = case_data_rw (c, v);
606
607       union value *val1 = NULL;
608
609       switch (type)
610         {
611         case INTERVALOID:
612         case TIMESTAMPTZOID:
613         case TIMETZOID:
614           if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
615             {
616               const struct variable *v1 = NULL;
617               v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
618
619               val1 = case_data_rw (c, v1);
620             }
621           break;
622         default:
623           break;
624         }
625
626
627       if (PQgetisnull (r->res, r->tuple, i))
628         {
629           value_set_missing (val, var_get_width (v));
630
631           switch (type)
632             {
633             case INTERVALOID:
634             case TIMESTAMPTZOID:
635             case TIMETZOID:
636               val1->f = SYSMIS;
637               break;
638             default:
639               break;
640             }
641         }
642       else
643         {
644           const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
645           int length = PQgetlength (r->res, r->tuple, i);
646
647           int var_width = var_get_width (v);
648           switch (type)
649             {
650             case BOOLOID:
651               {
652                 int8_t x;
653                 GET_VALUE (&vptr, x);
654                 val->f = x;
655               }
656               break;
657
658             case OIDOID:
659             case INT2OID:
660               {
661                 int16_t x;
662                 GET_VALUE (&vptr, x);
663                 val->f = x;
664               }
665               break;
666
667             case INT4OID:
668               {
669                 int32_t x;
670                 GET_VALUE (&vptr, x);
671                 val->f = x;
672               }
673               break;
674
675             case INT8OID:
676               {
677                 int64_t x;
678                 GET_VALUE (&vptr, x);
679                 val->f = x;
680               }
681               break;
682
683             case FLOAT4OID:
684               {
685                 float n;
686                 GET_VALUE (&vptr, n);
687                 val->f = n;
688               }
689               break;
690
691             case FLOAT8OID:
692               {
693                 double n;
694                 GET_VALUE (&vptr, n);
695                 val->f = n;
696               }
697               break;
698
699             case CASHOID:
700               {
701                 /* Postgres 8.3 uses 64 bits.
702                    Earlier versions use 32 */
703                 switch (length)
704                   {
705                   case 8:
706                     {
707                       int64_t x;
708                       GET_VALUE (&vptr, x);
709                       val->f = x / 100.0;
710                     }
711                     break;
712                   case 4:
713                     {
714                       int32_t x;
715                       GET_VALUE (&vptr, x);
716                       val->f = x / 100.0;
717                     }
718                     break;
719                   default:
720                     val->f = SYSMIS;
721                     break;
722                   }
723               }
724               break;
725
726             case INTERVALOID:
727               {
728                 if ( r->integer_datetimes )
729                   {
730                     uint32_t months;
731                     uint32_t days;
732                     uint32_t us;
733                     uint32_t things;
734
735                     GET_VALUE (&vptr, things);
736                     GET_VALUE (&vptr, us);
737                     GET_VALUE (&vptr, days);
738                     GET_VALUE (&vptr, months);
739
740                     val->f = us / 1000000.0;
741                     val->f += days * 24 * 3600;
742
743                     val1->f = months;
744                   }
745                 else
746                   {
747                     uint32_t days, months;
748                     double seconds;
749
750                     GET_VALUE (&vptr, seconds);
751                     GET_VALUE (&vptr, days);
752                     GET_VALUE (&vptr, months);
753
754                     val->f = seconds;
755                     val->f += days * 24 * 3600;
756
757                     val1->f = months;
758                   }
759               }
760               break;
761
762             case DATEOID:
763               {
764                 int32_t x;
765
766                 GET_VALUE (&vptr, x);
767
768                 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
769               }
770               break;
771
772             case TIMEOID:
773               {
774                 if ( r->integer_datetimes)
775                   {
776                     uint64_t x;
777                     GET_VALUE (&vptr, x);
778                     val->f = x / 1000000.0;
779                   }
780                 else
781                   {
782                     double x;
783                     GET_VALUE (&vptr, x);
784                     val->f = x;
785                   }
786               }
787               break;
788
789             case TIMETZOID:
790               {
791                 int32_t zone;
792                 if ( r->integer_datetimes)
793                   {
794                     uint64_t x;
795
796
797                     GET_VALUE (&vptr, x);
798                     val->f = x / 1000000.0;
799                   }
800                 else
801                   {
802                     double x;
803
804                     GET_VALUE (&vptr, x);
805                     val->f = x ;
806                   }
807
808                 GET_VALUE (&vptr, zone);
809                 val1->f = zone / 3600.0;
810               }
811               break;
812
813             case TIMESTAMPOID:
814             case TIMESTAMPTZOID:
815               {
816                 if ( r->integer_datetimes)
817                   {
818                     int64_t x;
819
820                     GET_VALUE (&vptr, x);
821
822                     x /= 1000000;
823
824                     val->f = (x + r->postgres_epoch * 24 * 3600 );
825                   }
826                 else
827                   {
828                     double x;
829
830                     GET_VALUE (&vptr, x);
831
832                     val->f = (x + r->postgres_epoch * 24 * 3600 );
833                   }
834               }
835               break;
836             case TEXTOID:
837             case VARCHAROID:
838             case BPCHAROID:
839             case BYTEAOID:
840               memcpy (value_str_rw (val, var_width), vptr,
841                       MIN (length, var_width));
842               break;
843
844             case NUMERICOID:
845               {
846                 double f = 0.0;
847                 int i;
848                 int16_t n_digits, weight, dscale;
849                 uint16_t sign;
850
851                 GET_VALUE (&vptr, n_digits);
852                 GET_VALUE (&vptr, weight);
853                 GET_VALUE (&vptr, sign);
854                 GET_VALUE (&vptr, dscale);
855
856 #if 0
857                 {
858                   struct fmt_spec fmt;
859                   fmt.d = dscale;
860                   fmt.type = FMT_E;
861                   fmt.w = fmt_max_output_width (fmt.type) ;
862                   fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
863                   var_set_both_formats (v, &fmt);
864                 }
865 #endif
866
867                 for (i = 0 ; i < n_digits;  ++i)
868                   {
869                     uint16_t x;
870                     GET_VALUE (&vptr, x);
871                     f += x * pow (10000, weight--);
872                   }
873
874                 if ( sign == 0x4000)
875                   f *= -1.0;
876
877                 if ( sign == 0xC000)
878                   val->f = SYSMIS;
879                 else
880                   val->f = f;
881               }
882               break;
883
884             default:
885               val->f = SYSMIS;
886               break;
887             }
888         }
889     }
890
891   r->tuple++;
892
893   return c;
894 }
895
896 #endif