244248a20aeecdf34bcabedbbf188047e5c9b71c
[pspp-builds.git] / src / data / psql-reader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2008 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
23 #include <stdlib.h>
24
25 #include "psql-reader.h"
26 #include "variable.h"
27 #include "format.h"
28 #include "calendar.h"
29
30 #include <inttypes.h>
31 #include <libpspp/str.h>
32
33 #include "gettext.h"
34 #define _(msgid) gettext (msgid)
35 #define N_(msgid) (msgid)
36
37
38 #if !PSQL_SUPPORT
39 struct casereader *
40 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
41 {
42   msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
43
44   return NULL;
45 }
46
47 #else
48
49 #include <stdint.h>
50 #include <libpq-fe.h>
51
52
53 /* These macros  must be the same as in catalog/pg_types.h from the postgres source */
54 #define BOOLOID            16
55 #define BYTEAOID           17
56 #define CHAROID            18
57 #define NAMEOID            19
58 #define INT8OID            20
59 #define INT2OID            21
60 #define INT4OID            23
61 #define TEXTOID            25
62 #define OIDOID             26
63 #define FLOAT4OID          700
64 #define FLOAT8OID          701
65 #define CASHOID            790
66 #define BPCHAROID          1042
67 #define VARCHAROID         1043
68 #define DATEOID            1082
69 #define TIMEOID            1083
70 #define TIMESTAMPOID       1114
71 #define TIMESTAMPTZOID     1184
72 #define INTERVALOID        1186
73 #define TIMETZOID          1266
74 #define NUMERICOID         1700
75
76 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
77
78 static bool psql_casereader_read (struct casereader *, void *,
79                                   struct ccase *);
80
81 static const struct casereader_class psql_casereader_class =
82   {
83     psql_casereader_read,
84     psql_casereader_destroy,
85     NULL,
86     NULL,
87   };
88
89 struct psql_reader
90 {
91   PGconn *conn;
92   PGresult *res;
93   int tuple;
94
95   bool integer_datetimes;
96
97   double postgres_epoch;
98
99   size_t value_cnt;
100   struct dictionary *dict;
101
102   /* An array of ints, which maps psql column numbers into
103      pspp variables */
104   struct variable **vmap;
105   size_t vmapsize;
106
107   struct string fetch_cmd;
108   int cache_size;
109 };
110
111
112 static bool set_value (struct psql_reader *r,
113                        struct ccase *cc);
114
115
116
117 #if WORDS_BIGENDIAN
118 static void
119 data_to_native (const void *in_, void *out_, int len)
120 {
121   int i;
122   const unsigned char *in = in_;
123   unsigned char *out = out_;
124   for (i = 0 ; i < len ; ++i )
125     out[i] = in[i];
126 }
127 #else
128 static void
129 data_to_native (const void *in_, void *out_, int len)
130 {
131   int i;
132   const unsigned char *in = in_;
133   unsigned char *out = out_;
134   for (i = 0 ; i < len ; ++i )
135     out[len - i - 1] = in[i];
136 }
137 #endif
138
139
140 #define GET_VALUE(IN, OUT) do { \
141     size_t sz = sizeof (OUT); \
142     data_to_native (*(IN), &(OUT), sz) ; \
143     (*IN) += sz; \
144 } while (false)
145
146
147 #if 0
148 static void
149 dump (const unsigned char *x, int l)
150 {
151   int i;
152
153   for (i = 0; i < l ; ++i)
154     {
155       printf ("%02x ", x[i]);
156     }
157
158   putchar ('\n');
159
160   for (i = 0; i < l ; ++i)
161     {
162       if ( isprint (x[i]))
163         printf ("%c ", x[i]);
164       else
165         printf ("   ");
166     }
167
168   putchar ('\n');
169 }
170 #endif
171
172 static struct variable *
173 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
174             int width, const char *suggested_name, int col)
175 {
176   unsigned long int vx = 0;
177   struct variable *var;
178   char name[VAR_NAME_LEN + 1];
179
180   r->value_cnt += value_cnt_from_width (width);
181
182   if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
183     {
184       msg (ME, _("Cannot create variable name from %s"), suggested_name);
185       return NULL;
186     }
187
188   var = dict_create_var (r->dict, name, width);
189   var_set_both_formats (var, fmt);
190
191   if ( col != -1)
192     {
193       r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
194
195       r->vmap[col] = var;
196       r->vmapsize = col + 1;
197     }
198
199   return var;
200 }
201
202
203
204
205 /* Fill the cache */
206 static bool
207 reload_cache (struct psql_reader *r)
208 {
209   PQclear (r->res);
210   r->tuple = 0;
211
212   r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
213
214   if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
215     {
216       PQclear (r->res);
217       r->res = NULL;
218       return false;
219     }
220
221   return true;
222 }
223
224
225 struct casereader *
226 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
227 {
228   int i;
229   int n_fields, n_tuples;
230   PGresult *qres = NULL;
231   casenumber n_cases = CASENUMBER_MAX;
232
233   struct psql_reader *r = xzalloc (sizeof *r);
234   struct string query ;
235
236   r->conn = PQconnectdb (info->conninfo);
237   if ( NULL == r->conn)
238     {
239       msg (ME, _("Memory error whilst opening psql source"));
240       goto error;
241     }
242
243   if ( PQstatus (r->conn) != CONNECTION_OK )
244     {
245       msg (ME, _("Error opening psql source: %s."),
246            PQerrorMessage (r->conn));
247
248       goto error;
249     }
250
251   {
252     int ver_num;
253     const char *vers = PQparameterStatus (r->conn, "server_version");
254
255     sscanf (vers, "%d", &ver_num);
256
257     if ( ver_num < 8)
258       {
259         msg (ME,
260              _("Postgres server is version %s."
261                " Reading from versions earlier than 8.0 is not supported."),
262              vers);
263
264         goto error;
265       }
266   }
267
268   {
269     const char *dt =  PQparameterStatus (r->conn, "integer_datetimes");
270
271     r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
272   }
273
274 #if USE_SSL
275   if ( PQgetssl (r->conn) == NULL)
276 #endif
277     {
278       if (! info->allow_clear)
279         {
280           msg (ME, _("Connection is unencrypted, "
281                      "but unencrypted connections have not been permitted."));
282           goto error;
283         }
284     }
285
286   r->postgres_epoch =
287     calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
288
289
290   /* Create the dictionary and populate it */
291   *dict = r->dict = dict_create ();
292
293   /*
294     select count (*) from (select * from medium) stupid_sql_standard;
295   */
296
297   ds_init_cstr (&query,
298                 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
299                 "DECLARE  pspp BINARY CURSOR FOR ");
300
301   ds_put_substring (&query, info->sql.ss);
302
303   qres = PQexec (r->conn, ds_cstr (&query));
304   ds_destroy (&query);
305   if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
306     {
307       msg (ME, _("Error from psql source: %s."),
308            PQresultErrorMessage (qres));
309       goto error;
310     }
311
312   PQclear (qres);
313
314
315   /* Now use the count() function to find the total number of cases
316      that this query returns.
317      Doing this incurs some overhead.  The server has to iterate every
318      case in order to find this number.  However, it's performed on the
319      server side, and in all except the most huge databases the extra
320      overhead will be worth the effort.
321      On the other hand, most PSPP functions don't need to know this.
322      The GUI is the notable exception.
323   */
324   ds_init_cstr (&query, "SELECT count (*) FROM (");
325   ds_put_substring (&query, info->sql.ss);
326   ds_put_cstr (&query, ") stupid_sql_standard");
327
328   qres = PQexec (r->conn, ds_cstr (&query));
329   ds_destroy (&query);
330   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
331     {
332       msg (ME, _("Error from psql source: %s."),
333            PQresultErrorMessage (qres));
334       goto error;
335     }
336   n_cases = atol (PQgetvalue (qres, 0, 0));
337   PQclear (qres);
338
339   qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
340   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
341     {
342       msg (ME, _("Error from psql source: %s."),
343            PQresultErrorMessage (qres));
344       goto error;
345     }
346
347   n_tuples = PQntuples (qres);
348   n_fields = PQnfields (qres);
349
350   r->value_cnt = 0;
351   r->vmap = NULL;
352   r->vmapsize = 0;
353
354   for (i = 0 ; i < n_fields ; ++i )
355     {
356       struct variable *var;
357       struct fmt_spec fmt = {FMT_F, 8, 2};
358       Oid type = PQftype (qres, i);
359       int width = 0;
360       int length ;
361
362       /* If there are no data then make a finger in the air 
363          guess at the contents */
364       if ( n_tuples > 0 )
365         length = PQgetlength (qres, 0, i);
366       else 
367         length = MAX_SHORT_STRING;
368
369       switch (type)
370         {
371         case BOOLOID:
372         case OIDOID:
373         case INT2OID:
374         case INT4OID:
375         case INT8OID:
376         case FLOAT4OID:
377         case FLOAT8OID:
378           fmt.type = FMT_F;
379           break;
380         case CASHOID:
381           fmt.type = FMT_DOLLAR;
382           break;
383         case CHAROID:
384           fmt.type = FMT_A;
385           width = length > 0 ? length : 1;
386           fmt.d = 0;
387           fmt.w = 1;
388           break;
389         case TEXTOID:
390         case VARCHAROID:
391         case BPCHAROID:
392           fmt.type = FMT_A;
393           width = (info->str_width == -1) ?
394             ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
395           fmt.w = width;
396           fmt.d = 0;
397           break;
398         case BYTEAOID:
399           fmt.type = FMT_AHEX;
400           width = length > 0 ? length : MAX_SHORT_STRING;
401           fmt.w = width * 2;
402           fmt.d = 0;
403           break;
404         case INTERVALOID:
405           fmt.type = FMT_DTIME;
406           width = 0;
407           fmt.d = 0;
408           fmt.w = 13;
409           break;
410         case DATEOID:
411           fmt.type = FMT_DATE;
412           width = 0;
413           fmt.w = 11;
414           fmt.d = 0;
415           break;
416         case TIMEOID:
417         case TIMETZOID:
418           fmt.type = FMT_TIME;
419           width = 0;
420           fmt.w = 11;
421           fmt.d = 0;
422           break;
423         case TIMESTAMPOID:
424         case TIMESTAMPTZOID:
425           fmt.type = FMT_DATETIME;
426           fmt.d = 0;
427           fmt.w = 22;
428           width = 0;
429           break;
430         case NUMERICOID:
431           fmt.type = FMT_E;
432           fmt.d = 2;
433           fmt.w = 40;
434           width = 0;
435           break;
436         default:
437           msg (MW, _("Unsupported OID %d.  SYSMIS values will be inserted."), type);
438           fmt.type = FMT_A;
439           width = length > 0 ? length : MAX_SHORT_STRING;
440           fmt.w = width ;
441           fmt.d = 0;
442           break;
443         }
444
445       var = create_var (r, &fmt, width, PQfname (qres, i), i);
446       if ( type == NUMERICOID && n_tuples > 0)
447         {
448           const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
449           struct fmt_spec fmt;
450           int16_t n_digits, weight, dscale;
451           uint16_t sign;
452
453           GET_VALUE (&vptr, n_digits);
454           GET_VALUE (&vptr, weight);
455           GET_VALUE (&vptr, sign);
456           GET_VALUE (&vptr, dscale);
457
458           fmt.d = dscale;
459           fmt.type = FMT_E;
460           fmt.w = fmt_max_output_width (fmt.type) ;
461           fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
462           var_set_both_formats (var, &fmt);
463         }
464
465       /* Timezones need an extra variable */
466       switch (type)
467         {
468         case TIMETZOID:
469           {
470             struct string name;
471             ds_init_cstr (&name, var_get_name (var));
472             ds_put_cstr (&name, "-zone");
473             fmt.type = FMT_F;
474             fmt.w = 8;
475             fmt.d = 2;
476
477             create_var (r, &fmt, 0, ds_cstr (&name), -1);
478
479             ds_destroy (&name);
480           }
481           break;
482
483         case INTERVALOID:
484           {
485             struct string name;
486             ds_init_cstr (&name, var_get_name (var));
487             ds_put_cstr (&name, "-months");
488             fmt.type = FMT_F;
489             fmt.w = 3;
490             fmt.d = 0;
491
492             create_var (r, &fmt, 0, ds_cstr (&name), -1);
493
494             ds_destroy (&name);
495           }
496         default:
497           break;
498         }
499     }
500
501   PQclear (qres);
502
503   qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
504   if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
505     {
506       PQclear (qres);
507       goto error;
508     }
509   PQclear (qres);
510
511   r->cache_size = info->bsize != -1 ? info->bsize: 4096;
512
513   ds_init_empty (&r->fetch_cmd);
514   ds_put_format (&r->fetch_cmd,  "FETCH FORWARD %d FROM pspp", r->cache_size);
515
516   reload_cache (r);
517
518   return casereader_create_sequential
519     (NULL,
520      r->value_cnt,
521      n_cases,
522      &psql_casereader_class, r);
523
524  error:
525   dict_destroy (*dict);
526
527   psql_casereader_destroy (NULL, r);
528   return NULL;
529 }
530
531
532 static void
533 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
534 {
535   struct psql_reader *r = r_;
536   if (r == NULL)
537     return ;
538
539   ds_destroy (&r->fetch_cmd);
540   free (r->vmap);
541   if (r->res) PQclear (r->res);
542   PQfinish (r->conn);
543
544   free (r);
545 }
546
547
548
549 static bool
550 psql_casereader_read (struct casereader *reader UNUSED, void *r_,
551                       struct ccase *cc)
552 {
553   struct psql_reader *r = r_;
554
555   if ( NULL == r->res || r->tuple >= r->cache_size)
556     {
557       if ( ! reload_cache (r) )
558         return false;
559     }
560
561   return set_value (r, cc);
562 }
563
564 static bool
565 set_value (struct psql_reader *r,
566            struct ccase *c)
567 {
568   int i;
569   int n_vars;
570
571   assert (r->res);
572
573   n_vars = PQnfields (r->res);
574
575   if ( r->tuple >= PQntuples (r->res))
576     return false;
577
578   case_create (c, r->value_cnt);
579   memset (case_data_rw_idx (c, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
580
581
582   for (i = 0 ; i < n_vars ; ++i )
583     {
584       Oid type = PQftype (r->res, i);
585       const struct variable *v = r->vmap[i];
586       union value *val = case_data_rw (c, v);
587
588       union value *val1 = NULL;
589
590       switch (type)
591         {
592         case INTERVALOID:
593         case TIMESTAMPTZOID:
594         case TIMETZOID:
595           if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
596             {
597               const struct variable *v1 = NULL;
598               v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
599
600               val1 = case_data_rw (c, v1);
601             }
602           break;
603         default:
604           break;
605         }
606
607
608       if (PQgetisnull (r->res, r->tuple, i))
609         {
610           value_set_missing (val, var_get_width (v));
611
612           switch (type)
613             {
614             case INTERVALOID:
615             case TIMESTAMPTZOID:
616             case TIMETZOID:
617               val1->f = SYSMIS;
618               break;
619             default:
620               break;
621             }
622         }
623       else
624         {
625           const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
626           int length = PQgetlength (r->res, r->tuple, i);
627
628           int var_width = var_get_width (v);
629           switch (type)
630             {
631             case BOOLOID:
632               {
633                 int8_t x;
634                 GET_VALUE (&vptr, x);
635                 val->f = x;
636               }
637               break;
638
639             case OIDOID:
640             case INT2OID:
641               {
642                 int16_t x;
643                 GET_VALUE (&vptr, x);
644                 val->f = x;
645               }
646               break;
647
648             case INT4OID:
649               {
650                 int32_t x;
651                 GET_VALUE (&vptr, x);
652                 val->f = x;
653               }
654               break;
655
656             case INT8OID:
657               {
658                 int64_t x;
659                 GET_VALUE (&vptr, x);
660                 val->f = x;
661               }
662               break;
663
664             case FLOAT4OID:
665               {
666                 float n;
667                 GET_VALUE (&vptr, n);
668                 val->f = n;
669               }
670               break;
671
672             case FLOAT8OID:
673               {
674                 double n;
675                 GET_VALUE (&vptr, n);
676                 val->f = n;
677               }
678               break;
679
680             case CASHOID:
681               {
682                 /* Postgres 8.3 uses 64 bits.
683                    Earlier versions use 32 */
684                 switch (length)
685                   {
686                   case 8:
687                     {
688                       int64_t x;
689                       GET_VALUE (&vptr, x);
690                       val->f = x / 100.0;
691                     }
692                     break;
693                   case 4:
694                     {
695                       int32_t x;
696                       GET_VALUE (&vptr, x);
697                       val->f = x / 100.0;
698                     }
699                     break;
700                   default:
701                     val->f = SYSMIS;
702                     break;
703                   }
704               }
705               break;
706
707             case INTERVALOID:
708               {
709                 if ( r->integer_datetimes )
710                   {
711                     uint32_t months;
712                     uint32_t days;
713                     uint32_t us;
714                     uint32_t things;
715
716                     GET_VALUE (&vptr, things);
717                     GET_VALUE (&vptr, us);
718                     GET_VALUE (&vptr, days);
719                     GET_VALUE (&vptr, months);
720
721                     val->f = us / 1000000.0;
722                     val->f += days * 24 * 3600;
723
724                     val1->f = months;
725                   }
726                 else
727                   {
728                     uint32_t days, months;
729                     double seconds;
730
731                     GET_VALUE (&vptr, seconds);
732                     GET_VALUE (&vptr, days);
733                     GET_VALUE (&vptr, months);
734
735                     val->f = seconds;
736                     val->f += days * 24 * 3600;
737
738                     val1->f = months;
739                   }
740               }
741               break;
742
743             case DATEOID:
744               {
745                 int32_t x;
746
747                 GET_VALUE (&vptr, x);
748
749                 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
750               }
751               break;
752
753             case TIMEOID:
754               {
755                 if ( r->integer_datetimes)
756                   {
757                     uint64_t x;
758                     GET_VALUE (&vptr, x);
759                     val->f = x / 1000000.0;
760                   }
761                 else
762                   {
763                     double x;
764                     GET_VALUE (&vptr, x);
765                     val->f = x;
766                   }
767               }
768               break;
769
770             case TIMETZOID:
771               {
772                 int32_t zone;
773                 if ( r->integer_datetimes)
774                   {
775                     uint64_t x;
776
777
778                     GET_VALUE (&vptr, x);
779                     val->f = x / 1000000.0;
780                   }
781                 else
782                   {
783                     double x;
784
785                     GET_VALUE (&vptr, x);
786                     val->f = x ;
787                   }
788
789                 GET_VALUE (&vptr, zone);
790                 val1->f = zone / 3600.0;
791               }
792               break;
793
794             case TIMESTAMPOID:
795             case TIMESTAMPTZOID:
796               {
797                 if ( r->integer_datetimes)
798                   {
799                     int64_t x;
800
801                     GET_VALUE (&vptr, x);
802
803                     x /= 1000000;
804
805                     val->f = (x + r->postgres_epoch * 24 * 3600 );
806                   }
807                 else
808                   {
809                     double x;
810
811                     GET_VALUE (&vptr, x);
812
813                     val->f = (x + r->postgres_epoch * 24 * 3600 );
814                   }
815               }
816               break;
817             case TEXTOID:
818             case VARCHAROID:
819             case BPCHAROID:
820             case BYTEAOID:
821               memcpy (val->s, (char *) vptr, MIN (length, var_width));
822               break;
823
824             case NUMERICOID:
825               {
826                 double f = 0.0;
827                 int i;
828                 int16_t n_digits, weight, dscale;
829                 uint16_t sign;
830
831                 GET_VALUE (&vptr, n_digits);
832                 GET_VALUE (&vptr, weight);
833                 GET_VALUE (&vptr, sign);
834                 GET_VALUE (&vptr, dscale);
835
836 #if 0
837                 {
838                   struct fmt_spec fmt;
839                   fmt.d = dscale;
840                   fmt.type = FMT_E;
841                   fmt.w = fmt_max_output_width (fmt.type) ;
842                   fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
843                   var_set_both_formats (v, &fmt);
844                 }
845 #endif
846
847                 for (i = 0 ; i < n_digits;  ++i)
848                   {
849                     uint16_t x;
850                     GET_VALUE (&vptr, x);
851                     f += x * pow (10000, weight--);
852                   }
853
854                 if ( sign == 0x4000)
855                   f *= -1.0;
856
857                 if ( sign == 0xC000)
858                   val->f = SYSMIS;
859                 else
860                   val->f = f;
861               }
862               break;
863
864             default:
865               val->f = SYSMIS;
866               break;
867             }
868         }
869     }
870
871   r->tuple++;
872
873   return true;
874 }
875
876 #endif