Merge commit 'origin/stable'
[pspp-builds.git] / src / data / psql-reader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2008, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
23 #include <stdlib.h>
24
25 #include "psql-reader.h"
26 #include "variable.h"
27 #include "format.h"
28 #include "calendar.h"
29
30 #include <inttypes.h>
31 #include <libpspp/str.h>
32
33 #include "gettext.h"
34 #define _(msgid) gettext (msgid)
35 #define N_(msgid) (msgid)
36
37
38 #if !PSQL_SUPPORT
39 struct casereader *
40 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
41 {
42   msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
43
44   return NULL;
45 }
46
47 #else
48
49 #include <stdint.h>
50 #include <libpq-fe.h>
51
52
53 /* These macros  must be the same as in catalog/pg_types.h from the postgres source */
54 #define BOOLOID            16
55 #define BYTEAOID           17
56 #define CHAROID            18
57 #define NAMEOID            19
58 #define INT8OID            20
59 #define INT2OID            21
60 #define INT4OID            23
61 #define TEXTOID            25
62 #define OIDOID             26
63 #define FLOAT4OID          700
64 #define FLOAT8OID          701
65 #define CASHOID            790
66 #define BPCHAROID          1042
67 #define VARCHAROID         1043
68 #define DATEOID            1082
69 #define TIMEOID            1083
70 #define TIMESTAMPOID       1114
71 #define TIMESTAMPTZOID     1184
72 #define INTERVALOID        1186
73 #define TIMETZOID          1266
74 #define NUMERICOID         1700
75
76 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
77
78 static struct ccase *psql_casereader_read (struct casereader *, void *);
79
80 static const struct casereader_class psql_casereader_class =
81   {
82     psql_casereader_read,
83     psql_casereader_destroy,
84     NULL,
85     NULL,
86   };
87
88 struct psql_reader
89 {
90   PGconn *conn;
91   PGresult *res;
92   int tuple;
93
94   bool integer_datetimes;
95
96   double postgres_epoch;
97
98   size_t value_cnt;
99   struct dictionary *dict;
100
101   /* An array of ints, which maps psql column numbers into
102      pspp variables */
103   struct variable **vmap;
104   size_t vmapsize;
105
106   struct string fetch_cmd;
107   int cache_size;
108 };
109
110
111 static struct ccase *set_value (struct psql_reader *r);
112
113
114
115 #if WORDS_BIGENDIAN
116 static void
117 data_to_native (const void *in_, void *out_, int len)
118 {
119   int i;
120   const unsigned char *in = in_;
121   unsigned char *out = out_;
122   for (i = 0 ; i < len ; ++i )
123     out[i] = in[i];
124 }
125 #else
126 static void
127 data_to_native (const void *in_, void *out_, int len)
128 {
129   int i;
130   const unsigned char *in = in_;
131   unsigned char *out = out_;
132   for (i = 0 ; i < len ; ++i )
133     out[len - i - 1] = in[i];
134 }
135 #endif
136
137
138 #define GET_VALUE(IN, OUT) do { \
139     size_t sz = sizeof (OUT); \
140     data_to_native (*(IN), &(OUT), sz) ; \
141     (*IN) += sz; \
142 } while (false)
143
144
145 #if 0
146 static void
147 dump (const unsigned char *x, int l)
148 {
149   int i;
150
151   for (i = 0; i < l ; ++i)
152     {
153       printf ("%02x ", x[i]);
154     }
155
156   putchar ('\n');
157
158   for (i = 0; i < l ; ++i)
159     {
160       if ( isprint (x[i]))
161         printf ("%c ", x[i]);
162       else
163         printf ("   ");
164     }
165
166   putchar ('\n');
167 }
168 #endif
169
170 static struct variable *
171 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
172             int width, const char *suggested_name, int col)
173 {
174   unsigned long int vx = 0;
175   struct variable *var;
176   char name[VAR_NAME_LEN + 1];
177
178   r->value_cnt += value_cnt_from_width (width);
179
180   if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
181     {
182       msg (ME, _("Cannot create variable name from %s"), suggested_name);
183       return NULL;
184     }
185
186   var = dict_create_var (r->dict, name, width);
187   var_set_both_formats (var, fmt);
188
189   if ( col != -1)
190     {
191       r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
192
193       r->vmap[col] = var;
194       r->vmapsize = col + 1;
195     }
196
197   return var;
198 }
199
200
201
202
203 /* Fill the cache */
204 static bool
205 reload_cache (struct psql_reader *r)
206 {
207   PQclear (r->res);
208   r->tuple = 0;
209
210   r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
211
212   if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
213     {
214       PQclear (r->res);
215       r->res = NULL;
216       return false;
217     }
218
219   return true;
220 }
221
222
223 struct casereader *
224 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
225 {
226   int i;
227   int n_fields, n_tuples;
228   PGresult *qres = NULL;
229   casenumber n_cases = CASENUMBER_MAX;
230
231   struct psql_reader *r = xzalloc (sizeof *r);
232   struct string query ;
233
234   r->conn = PQconnectdb (info->conninfo);
235   if ( NULL == r->conn)
236     {
237       msg (ME, _("Memory error whilst opening psql source"));
238       goto error;
239     }
240
241   if ( PQstatus (r->conn) != CONNECTION_OK )
242     {
243       msg (ME, _("Error opening psql source: %s."),
244            PQerrorMessage (r->conn));
245
246       goto error;
247     }
248
249   {
250     int ver_num;
251     const char *vers = PQparameterStatus (r->conn, "server_version");
252
253     sscanf (vers, "%d", &ver_num);
254
255     if ( ver_num < 8)
256       {
257         msg (ME,
258              _("Postgres server is version %s."
259                " Reading from versions earlier than 8.0 is not supported."),
260              vers);
261
262         goto error;
263       }
264   }
265
266   {
267     const char *dt =  PQparameterStatus (r->conn, "integer_datetimes");
268
269     r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
270   }
271
272 #if USE_SSL
273   if ( PQgetssl (r->conn) == NULL)
274 #endif
275     {
276       if (! info->allow_clear)
277         {
278           msg (ME, _("Connection is unencrypted, "
279                      "but unencrypted connections have not been permitted."));
280           goto error;
281         }
282     }
283
284   r->postgres_epoch =
285     calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
286
287
288   /* Create the dictionary and populate it */
289   *dict = r->dict = dict_create ();
290
291   /*
292     select count (*) from (select * from medium) stupid_sql_standard;
293   */
294
295   ds_init_cstr (&query,
296                 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
297                 "DECLARE  pspp BINARY CURSOR FOR ");
298
299   ds_put_substring (&query, info->sql.ss);
300
301   qres = PQexec (r->conn, ds_cstr (&query));
302   ds_destroy (&query);
303   if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
304     {
305       msg (ME, _("Error from psql source: %s."),
306            PQresultErrorMessage (qres));
307       goto error;
308     }
309
310   PQclear (qres);
311
312
313   /* Now use the count() function to find the total number of cases
314      that this query returns.
315      Doing this incurs some overhead.  The server has to iterate every
316      case in order to find this number.  However, it's performed on the
317      server side, and in all except the most huge databases the extra
318      overhead will be worth the effort.
319      On the other hand, most PSPP functions don't need to know this.
320      The GUI is the notable exception.
321   */
322   ds_init_cstr (&query, "SELECT count (*) FROM (");
323   ds_put_substring (&query, info->sql.ss);
324   ds_put_cstr (&query, ") stupid_sql_standard");
325
326   qres = PQexec (r->conn, ds_cstr (&query));
327   ds_destroy (&query);
328   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
329     {
330       msg (ME, _("Error from psql source: %s."),
331            PQresultErrorMessage (qres));
332       goto error;
333     }
334   n_cases = atol (PQgetvalue (qres, 0, 0));
335   PQclear (qres);
336
337   qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
338   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
339     {
340       msg (ME, _("Error from psql source: %s."),
341            PQresultErrorMessage (qres));
342       goto error;
343     }
344
345   n_tuples = PQntuples (qres);
346   n_fields = PQnfields (qres);
347
348   r->value_cnt = 0;
349   r->vmap = NULL;
350   r->vmapsize = 0;
351
352   for (i = 0 ; i < n_fields ; ++i )
353     {
354       struct variable *var;
355       struct fmt_spec fmt = {FMT_F, 8, 2};
356       Oid type = PQftype (qres, i);
357       int width = 0;
358       int length ;
359
360       /* If there are no data then make a finger in the air 
361          guess at the contents */
362       if ( n_tuples > 0 )
363         length = PQgetlength (qres, 0, i);
364       else 
365         length = MAX_SHORT_STRING;
366
367       switch (type)
368         {
369         case BOOLOID:
370         case OIDOID:
371         case INT2OID:
372         case INT4OID:
373         case INT8OID:
374         case FLOAT4OID:
375         case FLOAT8OID:
376           fmt.type = FMT_F;
377           break;
378         case CASHOID:
379           fmt.type = FMT_DOLLAR;
380           break;
381         case CHAROID:
382           fmt.type = FMT_A;
383           width = length > 0 ? length : 1;
384           fmt.d = 0;
385           fmt.w = 1;
386           break;
387         case TEXTOID:
388         case VARCHAROID:
389         case BPCHAROID:
390           fmt.type = FMT_A;
391           width = (info->str_width == -1) ?
392             ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
393           fmt.w = width;
394           fmt.d = 0;
395           break;
396         case BYTEAOID:
397           fmt.type = FMT_AHEX;
398           width = length > 0 ? length : MAX_SHORT_STRING;
399           fmt.w = width * 2;
400           fmt.d = 0;
401           break;
402         case INTERVALOID:
403           fmt.type = FMT_DTIME;
404           width = 0;
405           fmt.d = 0;
406           fmt.w = 13;
407           break;
408         case DATEOID:
409           fmt.type = FMT_DATE;
410           width = 0;
411           fmt.w = 11;
412           fmt.d = 0;
413           break;
414         case TIMEOID:
415         case TIMETZOID:
416           fmt.type = FMT_TIME;
417           width = 0;
418           fmt.w = 11;
419           fmt.d = 0;
420           break;
421         case TIMESTAMPOID:
422         case TIMESTAMPTZOID:
423           fmt.type = FMT_DATETIME;
424           fmt.d = 0;
425           fmt.w = 22;
426           width = 0;
427           break;
428         case NUMERICOID:
429           fmt.type = FMT_E;
430           fmt.d = 2;
431           fmt.w = 40;
432           width = 0;
433           break;
434         default:
435           msg (MW, _("Unsupported OID %d.  SYSMIS values will be inserted."), type);
436           fmt.type = FMT_A;
437           width = length > 0 ? length : MAX_SHORT_STRING;
438           fmt.w = width ;
439           fmt.d = 0;
440           break;
441         }
442
443       if ( width == 0 && fmt_is_string (fmt.type))
444         fmt.w = width = MAX_SHORT_STRING;
445
446
447       var = create_var (r, &fmt, width, PQfname (qres, i), i);
448       if ( type == NUMERICOID && n_tuples > 0)
449         {
450           const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
451           struct fmt_spec fmt;
452           int16_t n_digits, weight, dscale;
453           uint16_t sign;
454
455           GET_VALUE (&vptr, n_digits);
456           GET_VALUE (&vptr, weight);
457           GET_VALUE (&vptr, sign);
458           GET_VALUE (&vptr, dscale);
459
460           fmt.d = dscale;
461           fmt.type = FMT_E;
462           fmt.w = fmt_max_output_width (fmt.type) ;
463           fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
464           var_set_both_formats (var, &fmt);
465         }
466
467       /* Timezones need an extra variable */
468       switch (type)
469         {
470         case TIMETZOID:
471           {
472             struct string name;
473             ds_init_cstr (&name, var_get_name (var));
474             ds_put_cstr (&name, "-zone");
475             fmt.type = FMT_F;
476             fmt.w = 8;
477             fmt.d = 2;
478
479             create_var (r, &fmt, 0, ds_cstr (&name), -1);
480
481             ds_destroy (&name);
482           }
483           break;
484
485         case INTERVALOID:
486           {
487             struct string name;
488             ds_init_cstr (&name, var_get_name (var));
489             ds_put_cstr (&name, "-months");
490             fmt.type = FMT_F;
491             fmt.w = 3;
492             fmt.d = 0;
493
494             create_var (r, &fmt, 0, ds_cstr (&name), -1);
495
496             ds_destroy (&name);
497           }
498         default:
499           break;
500         }
501     }
502
503   PQclear (qres);
504
505   qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
506   if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
507     {
508       PQclear (qres);
509       goto error;
510     }
511   PQclear (qres);
512
513   r->cache_size = info->bsize != -1 ? info->bsize: 4096;
514
515   ds_init_empty (&r->fetch_cmd);
516   ds_put_format (&r->fetch_cmd,  "FETCH FORWARD %d FROM pspp", r->cache_size);
517
518   reload_cache (r);
519
520   return casereader_create_sequential
521     (NULL,
522      r->value_cnt,
523      n_cases,
524      &psql_casereader_class, r);
525
526  error:
527   dict_destroy (*dict);
528
529   psql_casereader_destroy (NULL, r);
530   return NULL;
531 }
532
533
534 static void
535 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
536 {
537   struct psql_reader *r = r_;
538   if (r == NULL)
539     return ;
540
541   ds_destroy (&r->fetch_cmd);
542   free (r->vmap);
543   if (r->res) PQclear (r->res);
544   PQfinish (r->conn);
545
546   free (r);
547 }
548
549
550
551 static struct ccase *
552 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
553 {
554   struct psql_reader *r = r_;
555
556   if ( NULL == r->res || r->tuple >= r->cache_size)
557     {
558       if ( ! reload_cache (r) )
559         return false;
560     }
561
562   return set_value (r);
563 }
564
565 static struct ccase *
566 set_value (struct psql_reader *r)
567 {
568   struct ccase *c;
569   int n_vars;
570   int i;
571
572   assert (r->res);
573
574   n_vars = PQnfields (r->res);
575
576   if ( r->tuple >= PQntuples (r->res))
577     return NULL;
578
579   c = case_create (r->value_cnt);
580   memset (case_data_rw_idx (c, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
581
582
583   for (i = 0 ; i < n_vars ; ++i )
584     {
585       Oid type = PQftype (r->res, i);
586       const struct variable *v = r->vmap[i];
587       union value *val = case_data_rw (c, v);
588
589       union value *val1 = NULL;
590
591       switch (type)
592         {
593         case INTERVALOID:
594         case TIMESTAMPTZOID:
595         case TIMETZOID:
596           if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
597             {
598               const struct variable *v1 = NULL;
599               v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
600
601               val1 = case_data_rw (c, v1);
602             }
603           break;
604         default:
605           break;
606         }
607
608
609       if (PQgetisnull (r->res, r->tuple, i))
610         {
611           value_set_missing (val, var_get_width (v));
612
613           switch (type)
614             {
615             case INTERVALOID:
616             case TIMESTAMPTZOID:
617             case TIMETZOID:
618               val1->f = SYSMIS;
619               break;
620             default:
621               break;
622             }
623         }
624       else
625         {
626           const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
627           int length = PQgetlength (r->res, r->tuple, i);
628
629           int var_width = var_get_width (v);
630           switch (type)
631             {
632             case BOOLOID:
633               {
634                 int8_t x;
635                 GET_VALUE (&vptr, x);
636                 val->f = x;
637               }
638               break;
639
640             case OIDOID:
641             case INT2OID:
642               {
643                 int16_t x;
644                 GET_VALUE (&vptr, x);
645                 val->f = x;
646               }
647               break;
648
649             case INT4OID:
650               {
651                 int32_t x;
652                 GET_VALUE (&vptr, x);
653                 val->f = x;
654               }
655               break;
656
657             case INT8OID:
658               {
659                 int64_t x;
660                 GET_VALUE (&vptr, x);
661                 val->f = x;
662               }
663               break;
664
665             case FLOAT4OID:
666               {
667                 float n;
668                 GET_VALUE (&vptr, n);
669                 val->f = n;
670               }
671               break;
672
673             case FLOAT8OID:
674               {
675                 double n;
676                 GET_VALUE (&vptr, n);
677                 val->f = n;
678               }
679               break;
680
681             case CASHOID:
682               {
683                 /* Postgres 8.3 uses 64 bits.
684                    Earlier versions use 32 */
685                 switch (length)
686                   {
687                   case 8:
688                     {
689                       int64_t x;
690                       GET_VALUE (&vptr, x);
691                       val->f = x / 100.0;
692                     }
693                     break;
694                   case 4:
695                     {
696                       int32_t x;
697                       GET_VALUE (&vptr, x);
698                       val->f = x / 100.0;
699                     }
700                     break;
701                   default:
702                     val->f = SYSMIS;
703                     break;
704                   }
705               }
706               break;
707
708             case INTERVALOID:
709               {
710                 if ( r->integer_datetimes )
711                   {
712                     uint32_t months;
713                     uint32_t days;
714                     uint32_t us;
715                     uint32_t things;
716
717                     GET_VALUE (&vptr, things);
718                     GET_VALUE (&vptr, us);
719                     GET_VALUE (&vptr, days);
720                     GET_VALUE (&vptr, months);
721
722                     val->f = us / 1000000.0;
723                     val->f += days * 24 * 3600;
724
725                     val1->f = months;
726                   }
727                 else
728                   {
729                     uint32_t days, months;
730                     double seconds;
731
732                     GET_VALUE (&vptr, seconds);
733                     GET_VALUE (&vptr, days);
734                     GET_VALUE (&vptr, months);
735
736                     val->f = seconds;
737                     val->f += days * 24 * 3600;
738
739                     val1->f = months;
740                   }
741               }
742               break;
743
744             case DATEOID:
745               {
746                 int32_t x;
747
748                 GET_VALUE (&vptr, x);
749
750                 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
751               }
752               break;
753
754             case TIMEOID:
755               {
756                 if ( r->integer_datetimes)
757                   {
758                     uint64_t x;
759                     GET_VALUE (&vptr, x);
760                     val->f = x / 1000000.0;
761                   }
762                 else
763                   {
764                     double x;
765                     GET_VALUE (&vptr, x);
766                     val->f = x;
767                   }
768               }
769               break;
770
771             case TIMETZOID:
772               {
773                 int32_t zone;
774                 if ( r->integer_datetimes)
775                   {
776                     uint64_t x;
777
778
779                     GET_VALUE (&vptr, x);
780                     val->f = x / 1000000.0;
781                   }
782                 else
783                   {
784                     double x;
785
786                     GET_VALUE (&vptr, x);
787                     val->f = x ;
788                   }
789
790                 GET_VALUE (&vptr, zone);
791                 val1->f = zone / 3600.0;
792               }
793               break;
794
795             case TIMESTAMPOID:
796             case TIMESTAMPTZOID:
797               {
798                 if ( r->integer_datetimes)
799                   {
800                     int64_t x;
801
802                     GET_VALUE (&vptr, x);
803
804                     x /= 1000000;
805
806                     val->f = (x + r->postgres_epoch * 24 * 3600 );
807                   }
808                 else
809                   {
810                     double x;
811
812                     GET_VALUE (&vptr, x);
813
814                     val->f = (x + r->postgres_epoch * 24 * 3600 );
815                   }
816               }
817               break;
818             case TEXTOID:
819             case VARCHAROID:
820             case BPCHAROID:
821             case BYTEAOID:
822               memcpy (val->s, (char *) vptr, MIN (length, var_width));
823               break;
824
825             case NUMERICOID:
826               {
827                 double f = 0.0;
828                 int i;
829                 int16_t n_digits, weight, dscale;
830                 uint16_t sign;
831
832                 GET_VALUE (&vptr, n_digits);
833                 GET_VALUE (&vptr, weight);
834                 GET_VALUE (&vptr, sign);
835                 GET_VALUE (&vptr, dscale);
836
837 #if 0
838                 {
839                   struct fmt_spec fmt;
840                   fmt.d = dscale;
841                   fmt.type = FMT_E;
842                   fmt.w = fmt_max_output_width (fmt.type) ;
843                   fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
844                   var_set_both_formats (v, &fmt);
845                 }
846 #endif
847
848                 for (i = 0 ; i < n_digits;  ++i)
849                   {
850                     uint16_t x;
851                     GET_VALUE (&vptr, x);
852                     f += x * pow (10000, weight--);
853                   }
854
855                 if ( sign == 0x4000)
856                   f *= -1.0;
857
858                 if ( sign == 0xC000)
859                   val->f = SYSMIS;
860                 else
861                   val->f = f;
862               }
863               break;
864
865             default:
866               val->f = SYSMIS;
867               break;
868             }
869         }
870     }
871
872   r->tuple++;
873
874   return c;
875 }
876
877 #endif