Set the dictionary's encoding when reading postgresql data.
[pspp-builds.git] / src / data / psql-reader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2008, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
23 #include <stdlib.h>
24
25 #include "psql-reader.h"
26 #include "variable.h"
27 #include "format.h"
28 #include "calendar.h"
29
30 #include <inttypes.h>
31 #include <libpspp/str.h>
32
33 #include "gettext.h"
34 #define _(msgid) gettext (msgid)
35 #define N_(msgid) (msgid)
36
37
38 #if !PSQL_SUPPORT
39 struct casereader *
40 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
41 {
42   msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
43
44   return NULL;
45 }
46
47 #else
48
49 #include <stdint.h>
50 #include <libpq-fe.h>
51
52
53 /* These macros  must be the same as in catalog/pg_types.h from the postgres source */
54 #define BOOLOID            16
55 #define BYTEAOID           17
56 #define CHAROID            18
57 #define NAMEOID            19
58 #define INT8OID            20
59 #define INT2OID            21
60 #define INT4OID            23
61 #define TEXTOID            25
62 #define OIDOID             26
63 #define FLOAT4OID          700
64 #define FLOAT8OID          701
65 #define CASHOID            790
66 #define BPCHAROID          1042
67 #define VARCHAROID         1043
68 #define DATEOID            1082
69 #define TIMEOID            1083
70 #define TIMESTAMPOID       1114
71 #define TIMESTAMPTZOID     1184
72 #define INTERVALOID        1186
73 #define TIMETZOID          1266
74 #define NUMERICOID         1700
75
76 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
77
78 static struct ccase *psql_casereader_read (struct casereader *, void *);
79
80 static const struct casereader_class psql_casereader_class =
81   {
82     psql_casereader_read,
83     psql_casereader_destroy,
84     NULL,
85     NULL,
86   };
87
88 struct psql_reader
89 {
90   PGconn *conn;
91   PGresult *res;
92   int tuple;
93
94   bool integer_datetimes;
95
96   double postgres_epoch;
97
98   size_t value_cnt;
99   struct dictionary *dict;
100
101   /* An array of ints, which maps psql column numbers into
102      pspp variables */
103   struct variable **vmap;
104   size_t vmapsize;
105
106   struct string fetch_cmd;
107   int cache_size;
108 };
109
110
111 static struct ccase *set_value (struct psql_reader *r);
112
113
114
115 #if WORDS_BIGENDIAN
116 static void
117 data_to_native (const void *in_, void *out_, int len)
118 {
119   int i;
120   const unsigned char *in = in_;
121   unsigned char *out = out_;
122   for (i = 0 ; i < len ; ++i )
123     out[i] = in[i];
124 }
125 #else
126 static void
127 data_to_native (const void *in_, void *out_, int len)
128 {
129   int i;
130   const unsigned char *in = in_;
131   unsigned char *out = out_;
132   for (i = 0 ; i < len ; ++i )
133     out[len - i - 1] = in[i];
134 }
135 #endif
136
137
138 #define GET_VALUE(IN, OUT) do { \
139     size_t sz = sizeof (OUT); \
140     data_to_native (*(IN), &(OUT), sz) ; \
141     (*IN) += sz; \
142 } while (false)
143
144
145 #if 0
146 static void
147 dump (const unsigned char *x, int l)
148 {
149   int i;
150
151   for (i = 0; i < l ; ++i)
152     {
153       printf ("%02x ", x[i]);
154     }
155
156   putchar ('\n');
157
158   for (i = 0; i < l ; ++i)
159     {
160       if ( isprint (x[i]))
161         printf ("%c ", x[i]);
162       else
163         printf ("   ");
164     }
165
166   putchar ('\n');
167 }
168 #endif
169
170 static struct variable *
171 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
172             int width, const char *suggested_name, int col)
173 {
174   unsigned long int vx = 0;
175   struct variable *var;
176   char name[VAR_NAME_LEN + 1];
177
178   r->value_cnt += value_cnt_from_width (width);
179
180   if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
181     {
182       msg (ME, _("Cannot create variable name from %s"), suggested_name);
183       return NULL;
184     }
185
186   var = dict_create_var (r->dict, name, width);
187   var_set_both_formats (var, fmt);
188
189   if ( col != -1)
190     {
191       r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
192
193       r->vmap[col] = var;
194       r->vmapsize = col + 1;
195     }
196
197   return var;
198 }
199
200
201
202
203 /* Fill the cache */
204 static bool
205 reload_cache (struct psql_reader *r)
206 {
207   PQclear (r->res);
208   r->tuple = 0;
209
210   r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
211
212   if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
213     {
214       PQclear (r->res);
215       r->res = NULL;
216       return false;
217     }
218
219   return true;
220 }
221
222
223 struct casereader *
224 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
225 {
226   int i;
227   int n_fields, n_tuples;
228   PGresult *qres = NULL;
229   casenumber n_cases = CASENUMBER_MAX;
230
231   struct psql_reader *r = xzalloc (sizeof *r);
232   struct string query ;
233
234   r->conn = PQconnectdb (info->conninfo);
235   if ( NULL == r->conn)
236     {
237       msg (ME, _("Memory error whilst opening psql source"));
238       goto error;
239     }
240
241   if ( PQstatus (r->conn) != CONNECTION_OK )
242     {
243       msg (ME, _("Error opening psql source: %s."),
244            PQerrorMessage (r->conn));
245
246       goto error;
247     }
248
249   {
250     int ver_num;
251     const char *vers = PQparameterStatus (r->conn, "server_version");
252
253     sscanf (vers, "%d", &ver_num);
254
255     if ( ver_num < 8)
256       {
257         msg (ME,
258              _("Postgres server is version %s."
259                " Reading from versions earlier than 8.0 is not supported."),
260              vers);
261
262         goto error;
263       }
264   }
265
266   {
267     const char *dt =  PQparameterStatus (r->conn, "integer_datetimes");
268
269     r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
270   }
271
272 #if USE_SSL
273   if ( PQgetssl (r->conn) == NULL)
274 #endif
275     {
276       if (! info->allow_clear)
277         {
278           msg (ME, _("Connection is unencrypted, "
279                      "but unencrypted connections have not been permitted."));
280           goto error;
281         }
282     }
283
284   r->postgres_epoch =
285     calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
286
287
288   /* Create the dictionary and populate it */
289   *dict = r->dict = dict_create ();
290
291   {
292     const int enc = PQclientEncoding (r->conn);
293
294     /* According to section 22.2 of the Postgresql manual
295        a value of zero (SQL_ASCII) indicates
296        "a declaration of ignorance about the encoding".
297        Accordingly, we don't set the dictionary's encoding
298        if we find this value.
299     */
300     if ( enc != 0 )
301       dict_set_encoding (r->dict, pg_encoding_to_char (enc));
302   }
303
304   /*
305     select count (*) from (select * from medium) stupid_sql_standard;
306   */
307   ds_init_cstr (&query,
308                 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
309                 "DECLARE  pspp BINARY CURSOR FOR ");
310
311   ds_put_substring (&query, info->sql.ss);
312
313   qres = PQexec (r->conn, ds_cstr (&query));
314   ds_destroy (&query);
315   if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
316     {
317       msg (ME, _("Error from psql source: %s."),
318            PQresultErrorMessage (qres));
319       goto error;
320     }
321
322   PQclear (qres);
323
324
325   /* Now use the count() function to find the total number of cases
326      that this query returns.
327      Doing this incurs some overhead.  The server has to iterate every
328      case in order to find this number.  However, it's performed on the
329      server side, and in all except the most huge databases the extra
330      overhead will be worth the effort.
331      On the other hand, most PSPP functions don't need to know this.
332      The GUI is the notable exception.
333   */
334   ds_init_cstr (&query, "SELECT count (*) FROM (");
335   ds_put_substring (&query, info->sql.ss);
336   ds_put_cstr (&query, ") stupid_sql_standard");
337
338   qres = PQexec (r->conn, ds_cstr (&query));
339   ds_destroy (&query);
340   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
341     {
342       msg (ME, _("Error from psql source: %s."),
343            PQresultErrorMessage (qres));
344       goto error;
345     }
346   n_cases = atol (PQgetvalue (qres, 0, 0));
347   PQclear (qres);
348
349   qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
350   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
351     {
352       msg (ME, _("Error from psql source: %s."),
353            PQresultErrorMessage (qres));
354       goto error;
355     }
356
357   n_tuples = PQntuples (qres);
358   n_fields = PQnfields (qres);
359
360   r->value_cnt = 0;
361   r->vmap = NULL;
362   r->vmapsize = 0;
363
364   for (i = 0 ; i < n_fields ; ++i )
365     {
366       struct variable *var;
367       struct fmt_spec fmt = {FMT_F, 8, 2};
368       Oid type = PQftype (qres, i);
369       int width = 0;
370       int length ;
371
372       /* If there are no data then make a finger in the air 
373          guess at the contents */
374       if ( n_tuples > 0 )
375         length = PQgetlength (qres, 0, i);
376       else 
377         length = MAX_SHORT_STRING;
378
379       switch (type)
380         {
381         case BOOLOID:
382         case OIDOID:
383         case INT2OID:
384         case INT4OID:
385         case INT8OID:
386         case FLOAT4OID:
387         case FLOAT8OID:
388           fmt.type = FMT_F;
389           break;
390         case CASHOID:
391           fmt.type = FMT_DOLLAR;
392           break;
393         case CHAROID:
394           fmt.type = FMT_A;
395           width = length > 0 ? length : 1;
396           fmt.d = 0;
397           fmt.w = 1;
398           break;
399         case TEXTOID:
400         case VARCHAROID:
401         case BPCHAROID:
402           fmt.type = FMT_A;
403           width = (info->str_width == -1) ?
404             ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
405           fmt.w = width;
406           fmt.d = 0;
407           break;
408         case BYTEAOID:
409           fmt.type = FMT_AHEX;
410           width = length > 0 ? length : MAX_SHORT_STRING;
411           fmt.w = width * 2;
412           fmt.d = 0;
413           break;
414         case INTERVALOID:
415           fmt.type = FMT_DTIME;
416           width = 0;
417           fmt.d = 0;
418           fmt.w = 13;
419           break;
420         case DATEOID:
421           fmt.type = FMT_DATE;
422           width = 0;
423           fmt.w = 11;
424           fmt.d = 0;
425           break;
426         case TIMEOID:
427         case TIMETZOID:
428           fmt.type = FMT_TIME;
429           width = 0;
430           fmt.w = 11;
431           fmt.d = 0;
432           break;
433         case TIMESTAMPOID:
434         case TIMESTAMPTZOID:
435           fmt.type = FMT_DATETIME;
436           fmt.d = 0;
437           fmt.w = 22;
438           width = 0;
439           break;
440         case NUMERICOID:
441           fmt.type = FMT_E;
442           fmt.d = 2;
443           fmt.w = 40;
444           width = 0;
445           break;
446         default:
447           msg (MW, _("Unsupported OID %d.  SYSMIS values will be inserted."), type);
448           fmt.type = FMT_A;
449           width = length > 0 ? length : MAX_SHORT_STRING;
450           fmt.w = width ;
451           fmt.d = 0;
452           break;
453         }
454
455       var = create_var (r, &fmt, width, PQfname (qres, i), i);
456       if ( type == NUMERICOID && n_tuples > 0)
457         {
458           const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
459           struct fmt_spec fmt;
460           int16_t n_digits, weight, dscale;
461           uint16_t sign;
462
463           GET_VALUE (&vptr, n_digits);
464           GET_VALUE (&vptr, weight);
465           GET_VALUE (&vptr, sign);
466           GET_VALUE (&vptr, dscale);
467
468           fmt.d = dscale;
469           fmt.type = FMT_E;
470           fmt.w = fmt_max_output_width (fmt.type) ;
471           fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
472           var_set_both_formats (var, &fmt);
473         }
474
475       /* Timezones need an extra variable */
476       switch (type)
477         {
478         case TIMETZOID:
479           {
480             struct string name;
481             ds_init_cstr (&name, var_get_name (var));
482             ds_put_cstr (&name, "-zone");
483             fmt.type = FMT_F;
484             fmt.w = 8;
485             fmt.d = 2;
486
487             create_var (r, &fmt, 0, ds_cstr (&name), -1);
488
489             ds_destroy (&name);
490           }
491           break;
492
493         case INTERVALOID:
494           {
495             struct string name;
496             ds_init_cstr (&name, var_get_name (var));
497             ds_put_cstr (&name, "-months");
498             fmt.type = FMT_F;
499             fmt.w = 3;
500             fmt.d = 0;
501
502             create_var (r, &fmt, 0, ds_cstr (&name), -1);
503
504             ds_destroy (&name);
505           }
506         default:
507           break;
508         }
509     }
510
511   PQclear (qres);
512
513   qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
514   if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
515     {
516       PQclear (qres);
517       goto error;
518     }
519   PQclear (qres);
520
521   r->cache_size = info->bsize != -1 ? info->bsize: 4096;
522
523   ds_init_empty (&r->fetch_cmd);
524   ds_put_format (&r->fetch_cmd,  "FETCH FORWARD %d FROM pspp", r->cache_size);
525
526   reload_cache (r);
527
528   return casereader_create_sequential
529     (NULL,
530      r->value_cnt,
531      n_cases,
532      &psql_casereader_class, r);
533
534  error:
535   dict_destroy (*dict);
536
537   psql_casereader_destroy (NULL, r);
538   return NULL;
539 }
540
541
542 static void
543 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
544 {
545   struct psql_reader *r = r_;
546   if (r == NULL)
547     return ;
548
549   ds_destroy (&r->fetch_cmd);
550   free (r->vmap);
551   if (r->res) PQclear (r->res);
552   PQfinish (r->conn);
553
554   free (r);
555 }
556
557
558
559 static struct ccase *
560 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
561 {
562   struct psql_reader *r = r_;
563
564   if ( NULL == r->res || r->tuple >= r->cache_size)
565     {
566       if ( ! reload_cache (r) )
567         return false;
568     }
569
570   return set_value (r);
571 }
572
573 static struct ccase *
574 set_value (struct psql_reader *r)
575 {
576   struct ccase *c;
577   int n_vars;
578   int i;
579
580   assert (r->res);
581
582   n_vars = PQnfields (r->res);
583
584   if ( r->tuple >= PQntuples (r->res))
585     return NULL;
586
587   c = case_create (r->value_cnt);
588   memset (case_data_rw_idx (c, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
589
590
591   for (i = 0 ; i < n_vars ; ++i )
592     {
593       Oid type = PQftype (r->res, i);
594       const struct variable *v = r->vmap[i];
595       union value *val = case_data_rw (c, v);
596
597       union value *val1 = NULL;
598
599       switch (type)
600         {
601         case INTERVALOID:
602         case TIMESTAMPTZOID:
603         case TIMETZOID:
604           if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
605             {
606               const struct variable *v1 = NULL;
607               v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
608
609               val1 = case_data_rw (c, v1);
610             }
611           break;
612         default:
613           break;
614         }
615
616
617       if (PQgetisnull (r->res, r->tuple, i))
618         {
619           value_set_missing (val, var_get_width (v));
620
621           switch (type)
622             {
623             case INTERVALOID:
624             case TIMESTAMPTZOID:
625             case TIMETZOID:
626               val1->f = SYSMIS;
627               break;
628             default:
629               break;
630             }
631         }
632       else
633         {
634           const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
635           int length = PQgetlength (r->res, r->tuple, i);
636
637           int var_width = var_get_width (v);
638           switch (type)
639             {
640             case BOOLOID:
641               {
642                 int8_t x;
643                 GET_VALUE (&vptr, x);
644                 val->f = x;
645               }
646               break;
647
648             case OIDOID:
649             case INT2OID:
650               {
651                 int16_t x;
652                 GET_VALUE (&vptr, x);
653                 val->f = x;
654               }
655               break;
656
657             case INT4OID:
658               {
659                 int32_t x;
660                 GET_VALUE (&vptr, x);
661                 val->f = x;
662               }
663               break;
664
665             case INT8OID:
666               {
667                 int64_t x;
668                 GET_VALUE (&vptr, x);
669                 val->f = x;
670               }
671               break;
672
673             case FLOAT4OID:
674               {
675                 float n;
676                 GET_VALUE (&vptr, n);
677                 val->f = n;
678               }
679               break;
680
681             case FLOAT8OID:
682               {
683                 double n;
684                 GET_VALUE (&vptr, n);
685                 val->f = n;
686               }
687               break;
688
689             case CASHOID:
690               {
691                 /* Postgres 8.3 uses 64 bits.
692                    Earlier versions use 32 */
693                 switch (length)
694                   {
695                   case 8:
696                     {
697                       int64_t x;
698                       GET_VALUE (&vptr, x);
699                       val->f = x / 100.0;
700                     }
701                     break;
702                   case 4:
703                     {
704                       int32_t x;
705                       GET_VALUE (&vptr, x);
706                       val->f = x / 100.0;
707                     }
708                     break;
709                   default:
710                     val->f = SYSMIS;
711                     break;
712                   }
713               }
714               break;
715
716             case INTERVALOID:
717               {
718                 if ( r->integer_datetimes )
719                   {
720                     uint32_t months;
721                     uint32_t days;
722                     uint32_t us;
723                     uint32_t things;
724
725                     GET_VALUE (&vptr, things);
726                     GET_VALUE (&vptr, us);
727                     GET_VALUE (&vptr, days);
728                     GET_VALUE (&vptr, months);
729
730                     val->f = us / 1000000.0;
731                     val->f += days * 24 * 3600;
732
733                     val1->f = months;
734                   }
735                 else
736                   {
737                     uint32_t days, months;
738                     double seconds;
739
740                     GET_VALUE (&vptr, seconds);
741                     GET_VALUE (&vptr, days);
742                     GET_VALUE (&vptr, months);
743
744                     val->f = seconds;
745                     val->f += days * 24 * 3600;
746
747                     val1->f = months;
748                   }
749               }
750               break;
751
752             case DATEOID:
753               {
754                 int32_t x;
755
756                 GET_VALUE (&vptr, x);
757
758                 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
759               }
760               break;
761
762             case TIMEOID:
763               {
764                 if ( r->integer_datetimes)
765                   {
766                     uint64_t x;
767                     GET_VALUE (&vptr, x);
768                     val->f = x / 1000000.0;
769                   }
770                 else
771                   {
772                     double x;
773                     GET_VALUE (&vptr, x);
774                     val->f = x;
775                   }
776               }
777               break;
778
779             case TIMETZOID:
780               {
781                 int32_t zone;
782                 if ( r->integer_datetimes)
783                   {
784                     uint64_t x;
785
786
787                     GET_VALUE (&vptr, x);
788                     val->f = x / 1000000.0;
789                   }
790                 else
791                   {
792                     double x;
793
794                     GET_VALUE (&vptr, x);
795                     val->f = x ;
796                   }
797
798                 GET_VALUE (&vptr, zone);
799                 val1->f = zone / 3600.0;
800               }
801               break;
802
803             case TIMESTAMPOID:
804             case TIMESTAMPTZOID:
805               {
806                 if ( r->integer_datetimes)
807                   {
808                     int64_t x;
809
810                     GET_VALUE (&vptr, x);
811
812                     x /= 1000000;
813
814                     val->f = (x + r->postgres_epoch * 24 * 3600 );
815                   }
816                 else
817                   {
818                     double x;
819
820                     GET_VALUE (&vptr, x);
821
822                     val->f = (x + r->postgres_epoch * 24 * 3600 );
823                   }
824               }
825               break;
826             case TEXTOID:
827             case VARCHAROID:
828             case BPCHAROID:
829             case BYTEAOID:
830               memcpy (val->s, (char *) vptr, MIN (length, var_width));
831               break;
832
833             case NUMERICOID:
834               {
835                 double f = 0.0;
836                 int i;
837                 int16_t n_digits, weight, dscale;
838                 uint16_t sign;
839
840                 GET_VALUE (&vptr, n_digits);
841                 GET_VALUE (&vptr, weight);
842                 GET_VALUE (&vptr, sign);
843                 GET_VALUE (&vptr, dscale);
844
845 #if 0
846                 {
847                   struct fmt_spec fmt;
848                   fmt.d = dscale;
849                   fmt.type = FMT_E;
850                   fmt.w = fmt_max_output_width (fmt.type) ;
851                   fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
852                   var_set_both_formats (v, &fmt);
853                 }
854 #endif
855
856                 for (i = 0 ; i < n_digits;  ++i)
857                   {
858                     uint16_t x;
859                     GET_VALUE (&vptr, x);
860                     f += x * pow (10000, weight--);
861                   }
862
863                 if ( sign == 0x4000)
864                   f *= -1.0;
865
866                 if ( sign == 0xC000)
867                   val->f = SYSMIS;
868                 else
869                   val->f = f;
870               }
871               break;
872
873             default:
874               val->f = SYSMIS;
875               break;
876             }
877         }
878     }
879
880   r->tuple++;
881
882   return c;
883 }
884
885 #endif