Added the '=' to the plot subcommand's documentation.
[pspp-builds.git] / src / data / psql-reader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2008, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
23 #include <math.h>
24 #include <stdlib.h>
25
26 #include "psql-reader.h"
27 #include "variable.h"
28 #include "format.h"
29 #include "calendar.h"
30
31 #include <inttypes.h>
32 #include <libpspp/misc.h>
33 #include <libpspp/str.h>
34
35 #include "minmax.h"
36
37 #include "gettext.h"
38 #define _(msgid) gettext (msgid)
39 #define N_(msgid) (msgid)
40
41
42 #if !PSQL_SUPPORT
43 struct casereader *
44 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
45 {
46   msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
47
48   return NULL;
49 }
50
51 #else
52
53 #include <stdint.h>
54 #include <libpq-fe.h>
55
56
57 /* Default width of string variables. */
58 #define PSQL_DEFAULT_WIDTH 8
59
60 /* These macros  must be the same as in catalog/pg_types.h from the postgres source */
61 #define BOOLOID            16
62 #define BYTEAOID           17
63 #define CHAROID            18
64 #define NAMEOID            19
65 #define INT8OID            20
66 #define INT2OID            21
67 #define INT4OID            23
68 #define TEXTOID            25
69 #define OIDOID             26
70 #define FLOAT4OID          700
71 #define FLOAT8OID          701
72 #define CASHOID            790
73 #define BPCHAROID          1042
74 #define VARCHAROID         1043
75 #define DATEOID            1082
76 #define TIMEOID            1083
77 #define TIMESTAMPOID       1114
78 #define TIMESTAMPTZOID     1184
79 #define INTERVALOID        1186
80 #define TIMETZOID          1266
81 #define NUMERICOID         1700
82
83 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
84
85 static struct ccase *psql_casereader_read (struct casereader *, void *);
86
87 static const struct casereader_class psql_casereader_class =
88   {
89     psql_casereader_read,
90     psql_casereader_destroy,
91     NULL,
92     NULL,
93   };
94
95 struct psql_reader
96 {
97   PGconn *conn;
98   PGresult *res;
99   int tuple;
100
101   bool integer_datetimes;
102
103   double postgres_epoch;
104
105   struct caseproto *proto;
106   struct dictionary *dict;
107
108   /* An array of ints, which maps psql column numbers into
109      pspp variables */
110   struct variable **vmap;
111   size_t vmapsize;
112
113   struct string fetch_cmd;
114   int cache_size;
115 };
116
117
118 static struct ccase *set_value (struct psql_reader *r);
119
120
121
122 #if WORDS_BIGENDIAN
123 static void
124 data_to_native (const void *in_, void *out_, int len)
125 {
126   int i;
127   const unsigned char *in = in_;
128   unsigned char *out = out_;
129   for (i = 0 ; i < len ; ++i )
130     out[i] = in[i];
131 }
132 #else
133 static void
134 data_to_native (const void *in_, void *out_, int len)
135 {
136   int i;
137   const unsigned char *in = in_;
138   unsigned char *out = out_;
139   for (i = 0 ; i < len ; ++i )
140     out[len - i - 1] = in[i];
141 }
142 #endif
143
144
145 #define GET_VALUE(IN, OUT) do { \
146     size_t sz = sizeof (OUT); \
147     data_to_native (*(IN), &(OUT), sz) ; \
148     (*IN) += sz; \
149 } while (false)
150
151
152 #if 0
153 static void
154 dump (const unsigned char *x, int l)
155 {
156   int i;
157
158   for (i = 0; i < l ; ++i)
159     {
160       printf ("%02x ", x[i]);
161     }
162
163   putchar ('\n');
164
165   for (i = 0; i < l ; ++i)
166     {
167       if ( isprint (x[i]))
168         printf ("%c ", x[i]);
169       else
170         printf ("   ");
171     }
172
173   putchar ('\n');
174 }
175 #endif
176
177 static struct variable *
178 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
179             int width, const char *suggested_name, int col)
180 {
181   unsigned long int vx = 0;
182   struct variable *var;
183   char name[VAR_NAME_LEN + 1];
184
185   if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
186     {
187       msg (ME, _("Cannot create variable name from %s"), suggested_name);
188       return NULL;
189     }
190
191   var = dict_create_var (r->dict, name, width);
192   var_set_both_formats (var, fmt);
193
194   if ( col != -1)
195     {
196       r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
197
198       r->vmap[col] = var;
199       r->vmapsize = col + 1;
200     }
201
202   return var;
203 }
204
205
206
207
208 /* Fill the cache */
209 static bool
210 reload_cache (struct psql_reader *r)
211 {
212   PQclear (r->res);
213   r->tuple = 0;
214
215   r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
216
217   if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
218     {
219       PQclear (r->res);
220       r->res = NULL;
221       return false;
222     }
223
224   return true;
225 }
226
227
228 struct casereader *
229 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
230 {
231   int i;
232   int n_fields, n_tuples;
233   PGresult *qres = NULL;
234   casenumber n_cases = CASENUMBER_MAX;
235
236   struct psql_reader *r = xzalloc (sizeof *r);
237   struct string query ;
238
239   r->conn = PQconnectdb (info->conninfo);
240   if ( NULL == r->conn)
241     {
242       msg (ME, _("Memory error whilst opening psql source"));
243       goto error;
244     }
245
246   if ( PQstatus (r->conn) != CONNECTION_OK )
247     {
248       msg (ME, _("Error opening psql source: %s."),
249            PQerrorMessage (r->conn));
250
251       goto error;
252     }
253
254   {
255     int ver_num;
256     const char *vers = PQparameterStatus (r->conn, "server_version");
257
258     sscanf (vers, "%d", &ver_num);
259
260     if ( ver_num < 8)
261       {
262         msg (ME,
263              _("Postgres server is version %s."
264                " Reading from versions earlier than 8.0 is not supported."),
265              vers);
266
267         goto error;
268       }
269   }
270
271   {
272     const char *dt =  PQparameterStatus (r->conn, "integer_datetimes");
273
274     r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
275   }
276
277 #if USE_SSL
278   if ( PQgetssl (r->conn) == NULL)
279 #endif
280     {
281       if (! info->allow_clear)
282         {
283           msg (ME, _("Connection is unencrypted, "
284                      "but unencrypted connections have not been permitted."));
285           goto error;
286         }
287     }
288
289   r->postgres_epoch =
290     calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
291
292
293   /* Create the dictionary and populate it */
294   *dict = r->dict = dict_create ();
295
296   {
297     const int enc = PQclientEncoding (r->conn);
298
299     /* According to section 22.2 of the Postgresql manual
300        a value of zero (SQL_ASCII) indicates
301        "a declaration of ignorance about the encoding".
302        Accordingly, we don't set the dictionary's encoding
303        if we find this value.
304     */
305     if ( enc != 0 )
306       dict_set_encoding (r->dict, pg_encoding_to_char (enc));
307   }
308
309   /*
310     select count (*) from (select * from medium) stupid_sql_standard;
311   */
312   ds_init_cstr (&query,
313                 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
314                 "DECLARE  pspp BINARY CURSOR FOR ");
315
316   ds_put_substring (&query, info->sql.ss);
317
318   qres = PQexec (r->conn, ds_cstr (&query));
319   ds_destroy (&query);
320   if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
321     {
322       msg (ME, _("Error from psql source: %s."),
323            PQresultErrorMessage (qres));
324       goto error;
325     }
326
327   PQclear (qres);
328
329
330   /* Now use the count() function to find the total number of cases
331      that this query returns.
332      Doing this incurs some overhead.  The server has to iterate every
333      case in order to find this number.  However, it's performed on the
334      server side, and in all except the most huge databases the extra
335      overhead will be worth the effort.
336      On the other hand, most PSPP functions don't need to know this.
337      The GUI is the notable exception.
338   */
339   ds_init_cstr (&query, "SELECT count (*) FROM (");
340   ds_put_substring (&query, info->sql.ss);
341   ds_put_cstr (&query, ") stupid_sql_standard");
342
343   qres = PQexec (r->conn, ds_cstr (&query));
344   ds_destroy (&query);
345   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
346     {
347       msg (ME, _("Error from psql source: %s."),
348            PQresultErrorMessage (qres));
349       goto error;
350     }
351   n_cases = atol (PQgetvalue (qres, 0, 0));
352   PQclear (qres);
353
354   qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
355   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
356     {
357       msg (ME, _("Error from psql source: %s."),
358            PQresultErrorMessage (qres));
359       goto error;
360     }
361
362   n_tuples = PQntuples (qres);
363   n_fields = PQnfields (qres);
364
365   r->proto = NULL;
366   r->vmap = NULL;
367   r->vmapsize = 0;
368
369   for (i = 0 ; i < n_fields ; ++i )
370     {
371       struct variable *var;
372       struct fmt_spec fmt = {FMT_F, 8, 2};
373       Oid type = PQftype (qres, i);
374       int width = 0;
375       int length ;
376
377       /* If there are no data then make a finger in the air 
378          guess at the contents */
379       if ( n_tuples > 0 )
380         length = PQgetlength (qres, 0, i);
381       else 
382         length = PSQL_DEFAULT_WIDTH;
383
384       switch (type)
385         {
386         case BOOLOID:
387         case OIDOID:
388         case INT2OID:
389         case INT4OID:
390         case INT8OID:
391         case FLOAT4OID:
392         case FLOAT8OID:
393           fmt.type = FMT_F;
394           break;
395         case CASHOID:
396           fmt.type = FMT_DOLLAR;
397           break;
398         case CHAROID:
399           fmt.type = FMT_A;
400           width = length > 0 ? length : 1;
401           fmt.d = 0;
402           fmt.w = 1;
403           break;
404         case TEXTOID:
405         case VARCHAROID:
406         case BPCHAROID:
407           fmt.type = FMT_A;
408           width = (info->str_width == -1) ?
409             ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
410           fmt.w = width;
411           fmt.d = 0;
412           break;
413         case BYTEAOID:
414           fmt.type = FMT_AHEX;
415           width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
416           fmt.w = width * 2;
417           fmt.d = 0;
418           break;
419         case INTERVALOID:
420           fmt.type = FMT_DTIME;
421           width = 0;
422           fmt.d = 0;
423           fmt.w = 13;
424           break;
425         case DATEOID:
426           fmt.type = FMT_DATE;
427           width = 0;
428           fmt.w = 11;
429           fmt.d = 0;
430           break;
431         case TIMEOID:
432         case TIMETZOID:
433           fmt.type = FMT_TIME;
434           width = 0;
435           fmt.w = 11;
436           fmt.d = 0;
437           break;
438         case TIMESTAMPOID:
439         case TIMESTAMPTZOID:
440           fmt.type = FMT_DATETIME;
441           fmt.d = 0;
442           fmt.w = 22;
443           width = 0;
444           break;
445         case NUMERICOID:
446           fmt.type = FMT_E;
447           fmt.d = 2;
448           fmt.w = 40;
449           width = 0;
450           break;
451         default:
452           msg (MW, _("Unsupported OID %d.  SYSMIS values will be inserted."), type);
453           fmt.type = FMT_A;
454           width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
455           fmt.w = width ;
456           fmt.d = 0;
457           break;
458         }
459
460       if ( width == 0 && fmt_is_string (fmt.type))
461         fmt.w = width = PSQL_DEFAULT_WIDTH;
462
463
464       var = create_var (r, &fmt, width, PQfname (qres, i), i);
465       if ( type == NUMERICOID && n_tuples > 0)
466         {
467           const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
468           struct fmt_spec fmt;
469           int16_t n_digits, weight, dscale;
470           uint16_t sign;
471
472           GET_VALUE (&vptr, n_digits);
473           GET_VALUE (&vptr, weight);
474           GET_VALUE (&vptr, sign);
475           GET_VALUE (&vptr, dscale);
476
477           fmt.d = dscale;
478           fmt.type = FMT_E;
479           fmt.w = fmt_max_output_width (fmt.type) ;
480           fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
481           var_set_both_formats (var, &fmt);
482         }
483
484       /* Timezones need an extra variable */
485       switch (type)
486         {
487         case TIMETZOID:
488           {
489             struct string name;
490             ds_init_cstr (&name, var_get_name (var));
491             ds_put_cstr (&name, "-zone");
492             fmt.type = FMT_F;
493             fmt.w = 8;
494             fmt.d = 2;
495
496             create_var (r, &fmt, 0, ds_cstr (&name), -1);
497
498             ds_destroy (&name);
499           }
500           break;
501
502         case INTERVALOID:
503           {
504             struct string name;
505             ds_init_cstr (&name, var_get_name (var));
506             ds_put_cstr (&name, "-months");
507             fmt.type = FMT_F;
508             fmt.w = 3;
509             fmt.d = 0;
510
511             create_var (r, &fmt, 0, ds_cstr (&name), -1);
512
513             ds_destroy (&name);
514           }
515         default:
516           break;
517         }
518     }
519
520   PQclear (qres);
521
522   qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
523   if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
524     {
525       PQclear (qres);
526       goto error;
527     }
528   PQclear (qres);
529
530   r->cache_size = info->bsize != -1 ? info->bsize: 4096;
531
532   ds_init_empty (&r->fetch_cmd);
533   ds_put_format (&r->fetch_cmd,  "FETCH FORWARD %d FROM pspp", r->cache_size);
534
535   reload_cache (r);
536   r->proto = caseproto_ref (dict_get_proto (*dict));
537
538   return casereader_create_sequential
539     (NULL,
540      r->proto,
541      n_cases,
542      &psql_casereader_class, r);
543
544  error:
545   dict_destroy (*dict);
546
547   psql_casereader_destroy (NULL, r);
548   return NULL;
549 }
550
551
552 static void
553 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
554 {
555   struct psql_reader *r = r_;
556   if (r == NULL)
557     return ;
558
559   ds_destroy (&r->fetch_cmd);
560   free (r->vmap);
561   if (r->res) PQclear (r->res);
562   PQfinish (r->conn);
563   caseproto_unref (r->proto);
564
565   free (r);
566 }
567
568
569
570 static struct ccase *
571 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
572 {
573   struct psql_reader *r = r_;
574
575   if ( NULL == r->res || r->tuple >= r->cache_size)
576     {
577       if ( ! reload_cache (r) )
578         return false;
579     }
580
581   return set_value (r);
582 }
583
584 static struct ccase *
585 set_value (struct psql_reader *r)
586 {
587   struct ccase *c;
588   int n_vars;
589   int i;
590
591   assert (r->res);
592
593   n_vars = PQnfields (r->res);
594
595   if ( r->tuple >= PQntuples (r->res))
596     return NULL;
597
598   c = case_create (r->proto);
599   case_set_missing (c);
600
601
602   for (i = 0 ; i < n_vars ; ++i )
603     {
604       Oid type = PQftype (r->res, i);
605       const struct variable *v = r->vmap[i];
606       union value *val = case_data_rw (c, v);
607
608       union value *val1 = NULL;
609
610       switch (type)
611         {
612         case INTERVALOID:
613         case TIMESTAMPTZOID:
614         case TIMETZOID:
615           if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
616             {
617               const struct variable *v1 = NULL;
618               v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
619
620               val1 = case_data_rw (c, v1);
621             }
622           break;
623         default:
624           break;
625         }
626
627
628       if (PQgetisnull (r->res, r->tuple, i))
629         {
630           value_set_missing (val, var_get_width (v));
631
632           switch (type)
633             {
634             case INTERVALOID:
635             case TIMESTAMPTZOID:
636             case TIMETZOID:
637               val1->f = SYSMIS;
638               break;
639             default:
640               break;
641             }
642         }
643       else
644         {
645           const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
646           int length = PQgetlength (r->res, r->tuple, i);
647
648           int var_width = var_get_width (v);
649           switch (type)
650             {
651             case BOOLOID:
652               {
653                 int8_t x;
654                 GET_VALUE (&vptr, x);
655                 val->f = x;
656               }
657               break;
658
659             case OIDOID:
660             case INT2OID:
661               {
662                 int16_t x;
663                 GET_VALUE (&vptr, x);
664                 val->f = x;
665               }
666               break;
667
668             case INT4OID:
669               {
670                 int32_t x;
671                 GET_VALUE (&vptr, x);
672                 val->f = x;
673               }
674               break;
675
676             case INT8OID:
677               {
678                 int64_t x;
679                 GET_VALUE (&vptr, x);
680                 val->f = x;
681               }
682               break;
683
684             case FLOAT4OID:
685               {
686                 float n;
687                 GET_VALUE (&vptr, n);
688                 val->f = n;
689               }
690               break;
691
692             case FLOAT8OID:
693               {
694                 double n;
695                 GET_VALUE (&vptr, n);
696                 val->f = n;
697               }
698               break;
699
700             case CASHOID:
701               {
702                 /* Postgres 8.3 uses 64 bits.
703                    Earlier versions use 32 */
704                 switch (length)
705                   {
706                   case 8:
707                     {
708                       int64_t x;
709                       GET_VALUE (&vptr, x);
710                       val->f = x / 100.0;
711                     }
712                     break;
713                   case 4:
714                     {
715                       int32_t x;
716                       GET_VALUE (&vptr, x);
717                       val->f = x / 100.0;
718                     }
719                     break;
720                   default:
721                     val->f = SYSMIS;
722                     break;
723                   }
724               }
725               break;
726
727             case INTERVALOID:
728               {
729                 if ( r->integer_datetimes )
730                   {
731                     uint32_t months;
732                     uint32_t days;
733                     uint32_t us;
734                     uint32_t things;
735
736                     GET_VALUE (&vptr, things);
737                     GET_VALUE (&vptr, us);
738                     GET_VALUE (&vptr, days);
739                     GET_VALUE (&vptr, months);
740
741                     val->f = us / 1000000.0;
742                     val->f += days * 24 * 3600;
743
744                     val1->f = months;
745                   }
746                 else
747                   {
748                     uint32_t days, months;
749                     double seconds;
750
751                     GET_VALUE (&vptr, seconds);
752                     GET_VALUE (&vptr, days);
753                     GET_VALUE (&vptr, months);
754
755                     val->f = seconds;
756                     val->f += days * 24 * 3600;
757
758                     val1->f = months;
759                   }
760               }
761               break;
762
763             case DATEOID:
764               {
765                 int32_t x;
766
767                 GET_VALUE (&vptr, x);
768
769                 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
770               }
771               break;
772
773             case TIMEOID:
774               {
775                 if ( r->integer_datetimes)
776                   {
777                     uint64_t x;
778                     GET_VALUE (&vptr, x);
779                     val->f = x / 1000000.0;
780                   }
781                 else
782                   {
783                     double x;
784                     GET_VALUE (&vptr, x);
785                     val->f = x;
786                   }
787               }
788               break;
789
790             case TIMETZOID:
791               {
792                 int32_t zone;
793                 if ( r->integer_datetimes)
794                   {
795                     uint64_t x;
796
797
798                     GET_VALUE (&vptr, x);
799                     val->f = x / 1000000.0;
800                   }
801                 else
802                   {
803                     double x;
804
805                     GET_VALUE (&vptr, x);
806                     val->f = x ;
807                   }
808
809                 GET_VALUE (&vptr, zone);
810                 val1->f = zone / 3600.0;
811               }
812               break;
813
814             case TIMESTAMPOID:
815             case TIMESTAMPTZOID:
816               {
817                 if ( r->integer_datetimes)
818                   {
819                     int64_t x;
820
821                     GET_VALUE (&vptr, x);
822
823                     x /= 1000000;
824
825                     val->f = (x + r->postgres_epoch * 24 * 3600 );
826                   }
827                 else
828                   {
829                     double x;
830
831                     GET_VALUE (&vptr, x);
832
833                     val->f = (x + r->postgres_epoch * 24 * 3600 );
834                   }
835               }
836               break;
837             case TEXTOID:
838             case VARCHAROID:
839             case BPCHAROID:
840             case BYTEAOID:
841               memcpy (value_str_rw (val, var_width), (char *) vptr,
842                       MIN (length, var_width));
843               break;
844
845             case NUMERICOID:
846               {
847                 double f = 0.0;
848                 int i;
849                 int16_t n_digits, weight, dscale;
850                 uint16_t sign;
851
852                 GET_VALUE (&vptr, n_digits);
853                 GET_VALUE (&vptr, weight);
854                 GET_VALUE (&vptr, sign);
855                 GET_VALUE (&vptr, dscale);
856
857 #if 0
858                 {
859                   struct fmt_spec fmt;
860                   fmt.d = dscale;
861                   fmt.type = FMT_E;
862                   fmt.w = fmt_max_output_width (fmt.type) ;
863                   fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
864                   var_set_both_formats (v, &fmt);
865                 }
866 #endif
867
868                 for (i = 0 ; i < n_digits;  ++i)
869                   {
870                     uint16_t x;
871                     GET_VALUE (&vptr, x);
872                     f += x * pow (10000, weight--);
873                   }
874
875                 if ( sign == 0x4000)
876                   f *= -1.0;
877
878                 if ( sign == 0xC000)
879                   val->f = SYSMIS;
880                 else
881                   val->f = f;
882               }
883               break;
884
885             default:
886               val->f = SYSMIS;
887               break;
888             }
889         }
890     }
891
892   r->tuple++;
893
894   return c;
895 }
896
897 #endif