Fixed bug reporting the significance of paired value t-test.
[pspp-builds.git] / src / data / psql-reader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2008 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader-provider.h>
20 #include <libpspp/message.h>
21 #include <gl/xalloc.h>
22 #include <data/dictionary.h>
23 #include <stdlib.h>
24
25 #include "psql-reader.h"
26 #include "variable.h"
27 #include "format.h"
28 #include "calendar.h"
29
30 #include <inttypes.h>
31 #include <libpspp/str.h>
32
33 #include "gettext.h"
34 #define _(msgid) gettext (msgid)
35 #define N_(msgid) (msgid)
36
37
38 #if !PSQL_SUPPORT
39 struct casereader *
40 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
41 {
42   msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
43
44   return NULL;
45 }
46
47 #else
48
49 #include <stdint.h>
50 #include <libpq-fe.h>
51
52
53 /* These macros  must be the same as in catalog/pg_types.h from the postgres source */
54 #define BOOLOID            16
55 #define BYTEAOID           17
56 #define CHAROID            18
57 #define NAMEOID            19
58 #define INT8OID            20
59 #define INT2OID            21
60 #define INT4OID            23
61 #define TEXTOID            25
62 #define OIDOID             26
63 #define FLOAT4OID          700
64 #define FLOAT8OID          701
65 #define CASHOID            790
66 #define BPCHAROID          1042
67 #define VARCHAROID         1043
68 #define DATEOID            1082
69 #define TIMEOID            1083
70 #define TIMESTAMPOID       1114
71 #define TIMESTAMPTZOID     1184
72 #define INTERVALOID        1186
73 #define TIMETZOID          1266
74 #define NUMERICOID         1700
75
76 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
77
78 static bool psql_casereader_read (struct casereader *, void *,
79                                   struct ccase *);
80
81 static const struct casereader_class psql_casereader_class =
82   {
83     psql_casereader_read,
84     psql_casereader_destroy,
85     NULL,
86     NULL,
87   };
88
89 struct psql_reader
90 {
91   PGconn *conn;
92   PGresult *res;
93   int tuple;
94
95   bool integer_datetimes;
96
97   double postgres_epoch;
98
99   size_t value_cnt;
100   struct dictionary *dict;
101
102   /* An array of ints, which maps psql column numbers into
103      pspp variables */
104   struct variable **vmap;
105   size_t vmapsize;
106
107   struct string fetch_cmd;
108   int cache_size;
109 };
110
111
112 static bool set_value (struct psql_reader *r,
113                        struct ccase *cc);
114
115
116
117 #if WORDS_BIGENDIAN
118 static void
119 data_to_native (const void *in_, void *out_, int len)
120 {
121   int i;
122   const unsigned char *in = in_;
123   unsigned char *out = out_;
124   for (i = 0 ; i < len ; ++i )
125     out[i] = in[i];
126 }
127 #else
128 static void
129 data_to_native (const void *in_, void *out_, int len)
130 {
131   int i;
132   const unsigned char *in = in_;
133   unsigned char *out = out_;
134   for (i = 0 ; i < len ; ++i )
135     out[len - i - 1] = in[i];
136 }
137 #endif
138
139
140 #define GET_VALUE(IN, OUT) do { \
141     size_t sz = sizeof (OUT); \
142     data_to_native (*(IN), &(OUT), sz) ; \
143     (*IN) += sz; \
144 } while (false)
145
146
147 #if 0
148 static void
149 dump (const unsigned char *x, int l)
150 {
151   int i;
152
153   for (i = 0; i < l ; ++i)
154     {
155       printf ("%02x ", x[i]);
156     }
157
158   putchar ('\n');
159
160   for (i = 0; i < l ; ++i)
161     {
162       if ( isprint (x[i]))
163         printf ("%c ", x[i]);
164       else
165         printf ("   ");
166     }
167
168   putchar ('\n');
169 }
170 #endif
171
172 static struct variable *
173 create_var (struct psql_reader *r, const struct fmt_spec *fmt,
174             int width, const char *suggested_name, int col)
175 {
176   unsigned long int vx = 0;
177   struct variable *var;
178   char name[VAR_NAME_LEN + 1];
179
180   r->value_cnt += value_cnt_from_width (width);
181
182   if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
183     {
184       msg (ME, _("Cannot create variable name from %s"), suggested_name);
185       return NULL;
186     }
187
188   var = dict_create_var (r->dict, name, width);
189   var_set_both_formats (var, fmt);
190
191   if ( col != -1)
192     {
193       r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
194
195       r->vmap[col] = var;
196       r->vmapsize = col + 1;
197     }
198
199   return var;
200 }
201
202
203
204
205 /* Fill the cache */
206 static bool
207 reload_cache (struct psql_reader *r)
208 {
209   PQclear (r->res);
210   r->tuple = 0;
211
212   r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
213
214   if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
215     {
216       PQclear (r->res);
217       r->res = NULL;
218       return false;
219     }
220
221   return true;
222 }
223
224
225 struct casereader *
226 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
227 {
228   int i;
229   int n_fields, n_tuples;
230   PGresult *qres = NULL;
231   casenumber n_cases = CASENUMBER_MAX;
232
233   struct psql_reader *r = xzalloc (sizeof *r);
234   struct string query ;
235
236   r->conn = PQconnectdb (info->conninfo);
237   if ( NULL == r->conn)
238     {
239       msg (ME, _("Memory error whilst opening psql source"));
240       goto error;
241     }
242
243   if ( PQstatus (r->conn) != CONNECTION_OK )
244     {
245       msg (ME, _("Error opening psql source: %s."),
246            PQerrorMessage (r->conn));
247
248       goto error;
249     }
250
251   {
252     int ver_num;
253     const char *vers = PQparameterStatus (r->conn, "server_version");
254
255     sscanf (vers, "%d", &ver_num);
256
257     if ( ver_num < 8)
258       {
259         msg (ME,
260              _("Postgres server is version %s."
261                " Reading from versions earlier than 8.0 is not supported."),
262              vers);
263
264         goto error;
265       }
266   }
267
268   {
269     const char *dt =  PQparameterStatus (r->conn, "integer_datetimes");
270
271     r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
272   }
273
274 #if USE_SSL
275   if ( PQgetssl (r->conn) == NULL)
276 #endif
277     {
278       if (! info->allow_clear)
279         {
280           msg (ME, _("Connection is unencrypted, "
281                      "but unencrypted connections have not been permitted."));
282           goto error;
283         }
284     }
285
286   r->postgres_epoch =
287     calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
288
289
290   /* Create the dictionary and populate it */
291   *dict = r->dict = dict_create ();
292
293   /*
294     select count (*) from (select * from medium) stupid_sql_standard;
295   */
296
297   ds_init_cstr (&query,
298                 "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
299                 "DECLARE  pspp BINARY CURSOR FOR ");
300
301   ds_put_substring (&query, info->sql.ss);
302
303   qres = PQexec (r->conn, ds_cstr (&query));
304   ds_destroy (&query);
305   if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
306     {
307       msg (ME, _("Error from psql source: %s."),
308            PQresultErrorMessage (qres));
309       goto error;
310     }
311
312   PQclear (qres);
313
314
315   /* Now use the count() function to find the total number of cases
316      that this query returns.
317      Doing this incurs some overhead.  The server has to iterate every
318      case in order to find this number.  However, it's performed on the
319      server side, and in all except the most huge databases the extra
320      overhead will be worth the effort.
321      On the other hand, most PSPP functions don't need to know this.
322      The GUI is the notable exception.
323   */
324   ds_init_cstr (&query, "SELECT count (*) FROM (");
325   ds_put_substring (&query, info->sql.ss);
326   ds_put_cstr (&query, ") stupid_sql_standard");
327
328   qres = PQexec (r->conn, ds_cstr (&query));
329   ds_destroy (&query);
330   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
331     {
332       msg (ME, _("Error from psql source: %s."),
333            PQresultErrorMessage (qres));
334       goto error;
335     }
336   n_cases = atol (PQgetvalue (qres, 0, 0));
337   PQclear (qres);
338
339   qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
340   if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
341     {
342       msg (ME, _("Error from psql source: %s."),
343            PQresultErrorMessage (qres));
344       goto error;
345     }
346
347   n_tuples = PQntuples (qres);
348   n_fields = PQnfields (qres);
349
350   r->value_cnt = 0;
351   r->vmap = NULL;
352   r->vmapsize = 0;
353
354   for (i = 0 ; i < n_fields ; ++i )
355     {
356       struct variable *var;
357       struct fmt_spec fmt = {FMT_F, 8, 2};
358       Oid type = PQftype (qres, i);
359       int width = 0;
360       int length ;
361
362       /* If there are no data then make a finger in the air 
363          guess at the contents */
364       if ( n_tuples > 0 )
365         length = PQgetlength (qres, 0, i);
366       else 
367         length = MAX_SHORT_STRING;
368
369       switch (type)
370         {
371         case BOOLOID:
372         case OIDOID:
373         case INT2OID:
374         case INT4OID:
375         case INT8OID:
376         case FLOAT4OID:
377         case FLOAT8OID:
378           fmt.type = FMT_F;
379           break;
380         case CASHOID:
381           fmt.type = FMT_DOLLAR;
382           break;
383         case CHAROID:
384           fmt.type = FMT_A;
385           width = length > 0 ? length : 1;
386           fmt.d = 0;
387           fmt.w = 1;
388           break;
389         case TEXTOID:
390         case VARCHAROID:
391         case BPCHAROID:
392           fmt.type = FMT_A;
393           width = (info->str_width == -1) ?
394             ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
395           fmt.w = width;
396           fmt.d = 0;
397           break;
398         case BYTEAOID:
399           fmt.type = FMT_AHEX;
400           width = length > 0 ? length : MAX_SHORT_STRING;
401           fmt.w = width * 2;
402           fmt.d = 0;
403           break;
404         case INTERVALOID:
405           fmt.type = FMT_DTIME;
406           width = 0;
407           fmt.d = 0;
408           fmt.w = 13;
409           break;
410         case DATEOID:
411           fmt.type = FMT_DATE;
412           width = 0;
413           fmt.w = 11;
414           fmt.d = 0;
415           break;
416         case TIMEOID:
417         case TIMETZOID:
418           fmt.type = FMT_TIME;
419           width = 0;
420           fmt.w = 11;
421           fmt.d = 0;
422           break;
423         case TIMESTAMPOID:
424         case TIMESTAMPTZOID:
425           fmt.type = FMT_DATETIME;
426           fmt.d = 0;
427           fmt.w = 22;
428           width = 0;
429           break;
430         case NUMERICOID:
431           fmt.type = FMT_E;
432           fmt.d = 2;
433           fmt.w = 40;
434           width = 0;
435           break;
436         default:
437           msg (MW, _("Unsupported OID %d.  SYSMIS values will be inserted."), type);
438           fmt.type = FMT_A;
439           width = length > 0 ? length : MAX_SHORT_STRING;
440           fmt.w = width ;
441           fmt.d = 0;
442           break;
443         }
444
445       if ( width == 0 && fmt_is_string (fmt.type))
446         fmt.w = width = MAX_SHORT_STRING;
447
448
449       var = create_var (r, &fmt, width, PQfname (qres, i), i);
450       if ( type == NUMERICOID && n_tuples > 0)
451         {
452           const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
453           struct fmt_spec fmt;
454           int16_t n_digits, weight, dscale;
455           uint16_t sign;
456
457           GET_VALUE (&vptr, n_digits);
458           GET_VALUE (&vptr, weight);
459           GET_VALUE (&vptr, sign);
460           GET_VALUE (&vptr, dscale);
461
462           fmt.d = dscale;
463           fmt.type = FMT_E;
464           fmt.w = fmt_max_output_width (fmt.type) ;
465           fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
466           var_set_both_formats (var, &fmt);
467         }
468
469       /* Timezones need an extra variable */
470       switch (type)
471         {
472         case TIMETZOID:
473           {
474             struct string name;
475             ds_init_cstr (&name, var_get_name (var));
476             ds_put_cstr (&name, "-zone");
477             fmt.type = FMT_F;
478             fmt.w = 8;
479             fmt.d = 2;
480
481             create_var (r, &fmt, 0, ds_cstr (&name), -1);
482
483             ds_destroy (&name);
484           }
485           break;
486
487         case INTERVALOID:
488           {
489             struct string name;
490             ds_init_cstr (&name, var_get_name (var));
491             ds_put_cstr (&name, "-months");
492             fmt.type = FMT_F;
493             fmt.w = 3;
494             fmt.d = 0;
495
496             create_var (r, &fmt, 0, ds_cstr (&name), -1);
497
498             ds_destroy (&name);
499           }
500         default:
501           break;
502         }
503     }
504
505   PQclear (qres);
506
507   qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
508   if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
509     {
510       PQclear (qres);
511       goto error;
512     }
513   PQclear (qres);
514
515   r->cache_size = info->bsize != -1 ? info->bsize: 4096;
516
517   ds_init_empty (&r->fetch_cmd);
518   ds_put_format (&r->fetch_cmd,  "FETCH FORWARD %d FROM pspp", r->cache_size);
519
520   reload_cache (r);
521
522   return casereader_create_sequential
523     (NULL,
524      r->value_cnt,
525      n_cases,
526      &psql_casereader_class, r);
527
528  error:
529   dict_destroy (*dict);
530
531   psql_casereader_destroy (NULL, r);
532   return NULL;
533 }
534
535
536 static void
537 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
538 {
539   struct psql_reader *r = r_;
540   if (r == NULL)
541     return ;
542
543   ds_destroy (&r->fetch_cmd);
544   free (r->vmap);
545   if (r->res) PQclear (r->res);
546   PQfinish (r->conn);
547
548   free (r);
549 }
550
551
552
553 static bool
554 psql_casereader_read (struct casereader *reader UNUSED, void *r_,
555                       struct ccase *cc)
556 {
557   struct psql_reader *r = r_;
558
559   if ( NULL == r->res || r->tuple >= r->cache_size)
560     {
561       if ( ! reload_cache (r) )
562         return false;
563     }
564
565   return set_value (r, cc);
566 }
567
568 static bool
569 set_value (struct psql_reader *r,
570            struct ccase *c)
571 {
572   int i;
573   int n_vars;
574
575   assert (r->res);
576
577   n_vars = PQnfields (r->res);
578
579   if ( r->tuple >= PQntuples (r->res))
580     return false;
581
582   case_create (c, r->value_cnt);
583   memset (case_data_rw_idx (c, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
584
585
586   for (i = 0 ; i < n_vars ; ++i )
587     {
588       Oid type = PQftype (r->res, i);
589       const struct variable *v = r->vmap[i];
590       union value *val = case_data_rw (c, v);
591
592       union value *val1 = NULL;
593
594       switch (type)
595         {
596         case INTERVALOID:
597         case TIMESTAMPTZOID:
598         case TIMETZOID:
599           if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
600             {
601               const struct variable *v1 = NULL;
602               v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
603
604               val1 = case_data_rw (c, v1);
605             }
606           break;
607         default:
608           break;
609         }
610
611
612       if (PQgetisnull (r->res, r->tuple, i))
613         {
614           value_set_missing (val, var_get_width (v));
615
616           switch (type)
617             {
618             case INTERVALOID:
619             case TIMESTAMPTZOID:
620             case TIMETZOID:
621               val1->f = SYSMIS;
622               break;
623             default:
624               break;
625             }
626         }
627       else
628         {
629           const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
630           int length = PQgetlength (r->res, r->tuple, i);
631
632           int var_width = var_get_width (v);
633           switch (type)
634             {
635             case BOOLOID:
636               {
637                 int8_t x;
638                 GET_VALUE (&vptr, x);
639                 val->f = x;
640               }
641               break;
642
643             case OIDOID:
644             case INT2OID:
645               {
646                 int16_t x;
647                 GET_VALUE (&vptr, x);
648                 val->f = x;
649               }
650               break;
651
652             case INT4OID:
653               {
654                 int32_t x;
655                 GET_VALUE (&vptr, x);
656                 val->f = x;
657               }
658               break;
659
660             case INT8OID:
661               {
662                 int64_t x;
663                 GET_VALUE (&vptr, x);
664                 val->f = x;
665               }
666               break;
667
668             case FLOAT4OID:
669               {
670                 float n;
671                 GET_VALUE (&vptr, n);
672                 val->f = n;
673               }
674               break;
675
676             case FLOAT8OID:
677               {
678                 double n;
679                 GET_VALUE (&vptr, n);
680                 val->f = n;
681               }
682               break;
683
684             case CASHOID:
685               {
686                 /* Postgres 8.3 uses 64 bits.
687                    Earlier versions use 32 */
688                 switch (length)
689                   {
690                   case 8:
691                     {
692                       int64_t x;
693                       GET_VALUE (&vptr, x);
694                       val->f = x / 100.0;
695                     }
696                     break;
697                   case 4:
698                     {
699                       int32_t x;
700                       GET_VALUE (&vptr, x);
701                       val->f = x / 100.0;
702                     }
703                     break;
704                   default:
705                     val->f = SYSMIS;
706                     break;
707                   }
708               }
709               break;
710
711             case INTERVALOID:
712               {
713                 if ( r->integer_datetimes )
714                   {
715                     uint32_t months;
716                     uint32_t days;
717                     uint32_t us;
718                     uint32_t things;
719
720                     GET_VALUE (&vptr, things);
721                     GET_VALUE (&vptr, us);
722                     GET_VALUE (&vptr, days);
723                     GET_VALUE (&vptr, months);
724
725                     val->f = us / 1000000.0;
726                     val->f += days * 24 * 3600;
727
728                     val1->f = months;
729                   }
730                 else
731                   {
732                     uint32_t days, months;
733                     double seconds;
734
735                     GET_VALUE (&vptr, seconds);
736                     GET_VALUE (&vptr, days);
737                     GET_VALUE (&vptr, months);
738
739                     val->f = seconds;
740                     val->f += days * 24 * 3600;
741
742                     val1->f = months;
743                   }
744               }
745               break;
746
747             case DATEOID:
748               {
749                 int32_t x;
750
751                 GET_VALUE (&vptr, x);
752
753                 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
754               }
755               break;
756
757             case TIMEOID:
758               {
759                 if ( r->integer_datetimes)
760                   {
761                     uint64_t x;
762                     GET_VALUE (&vptr, x);
763                     val->f = x / 1000000.0;
764                   }
765                 else
766                   {
767                     double x;
768                     GET_VALUE (&vptr, x);
769                     val->f = x;
770                   }
771               }
772               break;
773
774             case TIMETZOID:
775               {
776                 int32_t zone;
777                 if ( r->integer_datetimes)
778                   {
779                     uint64_t x;
780
781
782                     GET_VALUE (&vptr, x);
783                     val->f = x / 1000000.0;
784                   }
785                 else
786                   {
787                     double x;
788
789                     GET_VALUE (&vptr, x);
790                     val->f = x ;
791                   }
792
793                 GET_VALUE (&vptr, zone);
794                 val1->f = zone / 3600.0;
795               }
796               break;
797
798             case TIMESTAMPOID:
799             case TIMESTAMPTZOID:
800               {
801                 if ( r->integer_datetimes)
802                   {
803                     int64_t x;
804
805                     GET_VALUE (&vptr, x);
806
807                     x /= 1000000;
808
809                     val->f = (x + r->postgres_epoch * 24 * 3600 );
810                   }
811                 else
812                   {
813                     double x;
814
815                     GET_VALUE (&vptr, x);
816
817                     val->f = (x + r->postgres_epoch * 24 * 3600 );
818                   }
819               }
820               break;
821             case TEXTOID:
822             case VARCHAROID:
823             case BPCHAROID:
824             case BYTEAOID:
825               memcpy (val->s, (char *) vptr, MIN (length, var_width));
826               break;
827
828             case NUMERICOID:
829               {
830                 double f = 0.0;
831                 int i;
832                 int16_t n_digits, weight, dscale;
833                 uint16_t sign;
834
835                 GET_VALUE (&vptr, n_digits);
836                 GET_VALUE (&vptr, weight);
837                 GET_VALUE (&vptr, sign);
838                 GET_VALUE (&vptr, dscale);
839
840 #if 0
841                 {
842                   struct fmt_spec fmt;
843                   fmt.d = dscale;
844                   fmt.type = FMT_E;
845                   fmt.w = fmt_max_output_width (fmt.type) ;
846                   fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
847                   var_set_both_formats (v, &fmt);
848                 }
849 #endif
850
851                 for (i = 0 ; i < n_digits;  ++i)
852                   {
853                     uint16_t x;
854                     GET_VALUE (&vptr, x);
855                     f += x * pow (10000, weight--);
856                   }
857
858                 if ( sign == 0x4000)
859                   f *= -1.0;
860
861                 if ( sign == 0xC000)
862                   val->f = SYSMIS;
863                 else
864                   val->f = f;
865               }
866               break;
867
868             default:
869               val->f = SYSMIS;
870               break;
871             }
872         }
873     }
874
875   r->tuple++;
876
877   return true;
878 }
879
880 #endif