treewide: Replace <name>_cnt by n_<name>s and <name>_cap by allocated_<name>.
[pspp] / src / data / psql-reader.c
index 20aa57dbfc17f28a8c342e763e137c4580d35fcb..ae56d18212b5eb86873835e6e8dc150285f544b6 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2008 Free Software Foundation, Inc.
+   Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
 
 #include <config.h>
 
-#include <data/casereader-provider.h>
-#include <libpspp/message.h>
-#include <gl/xalloc.h>
-#include <data/dictionary.h>
+#include "data/psql-reader.h"
+
+#include <inttypes.h>
+#include <math.h>
 #include <stdlib.h>
 
-#include "psql-reader.h"
-#include "variable.h"
-#include "format.h"
-#include "calendar.h"
+#include "data/calendar.h"
+#include "data/casereader-provider.h"
+#include "data/dictionary.h"
+#include "data/format.h"
+#include "data/variable.h"
+#include "libpspp/i18n.h"
+#include "libpspp/message.h"
+#include "libpspp/misc.h"
+#include "libpspp/str.h"
 
-#include <inttypes.h>
+#include "gl/c-strcase.h"
+#include "gl/minmax.h"
+#include "gl/xalloc.h"
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
@@ -36,7 +43,7 @@
 
 #if !PSQL_SUPPORT
 struct casereader *
-psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
+psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
 {
   msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
 
@@ -49,6 +56,9 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
 #include <libpq-fe.h>
 
 
+/* Default width of string variables. */
+#define PSQL_DEFAULT_WIDTH 8
+
 /* These macros  must be the same as in catalog/pg_types.h from the postgres source */
 #define BOOLOID            16
 #define BYTEAOID           17
@@ -74,8 +84,7 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
 
 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
 
-static bool psql_casereader_read (struct casereader *, void *,
-                                 struct ccase *);
+static struct ccase *psql_casereader_read (struct casereader *, void *);
 
 static const struct casereader_class psql_casereader_class =
   {
@@ -88,26 +97,27 @@ static const struct casereader_class psql_casereader_class =
 struct psql_reader
 {
   PGconn *conn;
+  PGresult *res;
+  int tuple;
 
   bool integer_datetimes;
 
   double postgres_epoch;
 
-  size_t value_cnt;
+  struct caseproto *proto;
   struct dictionary *dict;
 
-  bool used_first_case;
-  struct ccase first_case;
-
   /* An array of ints, which maps psql column numbers into
-     pspp variable numbers */
-  int *vmap;
+     pspp variables */
+  struct variable **vmap;
   size_t vmapsize;
+
+  struct string fetch_cmd;
+  int cache_size;
 };
 
 
-static void set_value (const struct psql_reader *r,
-                      PGresult *res, struct ccase *c);
+static struct ccase *set_value (struct psql_reader *r);
 
 
 
@@ -118,7 +128,7 @@ data_to_native (const void *in_, void *out_, int len)
   int i;
   const unsigned char *in = in_;
   unsigned char *out = out_;
-  for (i = 0 ; i < len ; ++i )
+  for (i = 0 ; i < len ; ++i)
     out[i] = in[i];
 }
 #else
@@ -128,7 +138,7 @@ data_to_native (const void *in_, void *out_, int len)
   int i;
   const unsigned char *in = in_;
   unsigned char *out = out_;
-  for (i = 0 ; i < len ; ++i )
+  for (i = 0 ; i < len ; ++i)
     out[len - i - 1] = in[i];
 }
 #endif
@@ -156,7 +166,7 @@ dump (const unsigned char *x, int l)
 
   for (i = 0; i < l ; ++i)
     {
-      if ( isprint (x[i]))
+      if (isprint (x[i]))
        printf ("%c ", x[i]);
       else
        printf ("   ");
@@ -171,53 +181,69 @@ create_var (struct psql_reader *r, const struct fmt_spec *fmt,
            int width, const char *suggested_name, int col)
 {
   unsigned long int vx = 0;
-  int vidx;
   struct variable *var;
-  char name[VAR_NAME_LEN + 1];
-
-  r->value_cnt += value_cnt_from_width (width);
-
-  if ( ! dict_make_unique_var_name (r->dict, suggested_name, &vx, name))
-    {
-      msg (ME, _("Cannot create variable name from %s"), suggested_name);
-      return NULL;
-    }
+  char *name;
 
+  name = dict_make_unique_var_name (r->dict, suggested_name, &vx);
   var = dict_create_var (r->dict, name, width);
-  var_set_both_formats (var, fmt);
+  free (name);
 
-  vidx = var_get_dict_index (var);
+  var_set_both_formats (var, fmt);
 
-  if ( col != -1)
+  if (col != -1)
     {
-      r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (int));
+      r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
 
-      r->vmap[col] = vidx;
+      r->vmap[col] = var;
       r->vmapsize = col + 1;
     }
 
   return var;
 }
 
+
+
+
+/* Fill the cache */
+static bool
+reload_cache (struct psql_reader *r)
+{
+  PQclear (r->res);
+  r->tuple = 0;
+
+  r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
+
+  if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
+    {
+      PQclear (r->res);
+      r->res = NULL;
+      return false;
+    }
+
+  return true;
+}
+
+
 struct casereader *
 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
 {
   int i;
-  int n_fields;
-  PGresult *res = NULL;
+  int n_fields, n_tuples;
+  PGresult *qres = NULL;
+  casenumber n_cases = CASENUMBER_MAX;
+  const char *encoding;
 
-  struct psql_reader *r = xzalloc (sizeof *r);
+  struct psql_reader *r = XZALLOC (struct psql_reader);
   struct string query ;
 
-
   r->conn = PQconnectdb (info->conninfo);
-  if ( NULL == r->conn)
+  if (NULL == r->conn)
     {
       msg (ME, _("Memory error whilst opening psql source"));
       goto error;
     }
 
-  if ( PQstatus (r->conn) != CONNECTION_OK )
+  if (PQstatus (r->conn) != CONNECTION_OK)
     {
       msg (ME, _("Error opening psql source: %s."),
           PQerrorMessage (r->conn));
@@ -226,12 +252,12 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
     }
 
   {
-    int v1;
+    int ver_num = 0;
     const char *vers = PQparameterStatus (r->conn, "server_version");
 
-    sscanf (vers, "%d", &v1);
+    sscanf (vers, "%d", &ver_num);
 
-    if ( v1 < 8)
+    if (ver_num < 8)
       {
        msg (ME,
             _("Postgres server is version %s."
@@ -245,11 +271,11 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
   {
     const char *dt =  PQparameterStatus (r->conn, "integer_datetimes");
 
-    r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
+    r->integer_datetimes = (0 == c_strcasecmp (dt, "on"));
   }
 
 #if USE_SSL
-  if ( PQgetssl (r->conn) == NULL)
+  if (PQgetssl (r->conn) == NULL)
 #endif
     {
       if (! info->allow_clear)
@@ -260,48 +286,103 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
        }
     }
 
-  r->postgres_epoch =
-    calendar_gregorian_to_offset (2000, 1, 1, NULL, NULL);
+  r->postgres_epoch = calendar_gregorian_to_offset (
+    2000, 1, 1, settings_get_fmt_settings (), NULL);
 
+  {
+    const int enc = PQclientEncoding (r->conn);
+
+    /* According to section 22.2 of the Postgresql manual
+       a value of zero (SQL_ASCII) indicates
+       "a declaration of ignorance about the encoding".
+       Accordingly, we use the default encoding
+       if we find this value.
+    */
+    encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding ();
+
+    /* Create the dictionary and populate it */
+    *dict = r->dict = dict_create (encoding);
+  }
 
-  /* Create the dictionary and populate it */
-  *dict = r->dict = dict_create ();
+  const int version = PQserverVersion (r->conn);
+  ds_init_empty (&query);
+  /*
+    Versions before 9.1 don't have the REPEATABLE READ isolation level.
+    However according to <a12321aabb@gmail.com> if the server is in the
+    "hot standby" mode then SERIALIZABLE won't work.
+   */
+  ds_put_c_format (&query,
+                  "BEGIN READ ONLY ISOLATION LEVEL %s; "
+                  "DECLARE  pspp BINARY CURSOR FOR ",
+                  (version < 90100) ? "SERIALIZABLE" : "REPEATABLE READ");
 
-  ds_init_cstr (&query, "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; DECLARE  pspp BINARY CURSOR FOR ");
   ds_put_substring (&query, info->sql.ss);
 
-  res = PQexec (r->conn, ds_cstr (&query));
+  qres = PQexec (r->conn, ds_cstr (&query));
   ds_destroy (&query);
-  if ( PQresultStatus (res) != PGRES_COMMAND_OK )
+  if (PQresultStatus (qres) != PGRES_COMMAND_OK)
     {
       msg (ME, _("Error from psql source: %s."),
-          PQresultErrorMessage (res));
+          PQresultErrorMessage (qres));
       goto error;
     }
 
-  PQclear (res);
+  PQclear (qres);
+
+
+  /* Now use the count() function to find the total number of cases
+     that this query returns.
+     Doing this incurs some overhead.  The server has to iterate every
+     case in order to find this number.  However, it's performed on the
+     server side, and in all except the most huge databases the extra
+     overhead will be worth the effort.
+     On the other hand, most PSPP functions don't need to know this.
+     The GUI is the notable exception.
+  */
+  ds_init_cstr (&query, "SELECT count (*) FROM (");
+  ds_put_substring (&query, info->sql.ss);
+  ds_put_cstr (&query, ") stupid_sql_standard");
 
-  res = PQexec (r->conn, "FETCH FIRST FROM pspp");
-  if ( PQresultStatus (res) != PGRES_TUPLES_OK )
+  qres = PQexec (r->conn, ds_cstr (&query));
+  ds_destroy (&query);
+  if (PQresultStatus (qres) != PGRES_TUPLES_OK)
     {
       msg (ME, _("Error from psql source: %s."),
-          PQresultErrorMessage (res));
+          PQresultErrorMessage (qres));
       goto error;
     }
+  n_cases = atol (PQgetvalue (qres, 0, 0));
+  PQclear (qres);
 
-  n_fields = PQnfields (res);
+  qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
+  if (PQresultStatus (qres) != PGRES_TUPLES_OK)
+    {
+      msg (ME, _("Error from psql source: %s."),
+          PQresultErrorMessage (qres));
+      goto error;
+    }
 
-  r->value_cnt = 0;
+  n_tuples = PQntuples (qres);
+  n_fields = PQnfields (qres);
+
+  r->proto = NULL;
   r->vmap = NULL;
   r->vmapsize = 0;
 
-  for (i = 0 ; i < n_fields ; ++i )
+  for (i = 0 ; i < n_fields ; ++i)
     {
       struct variable *var;
-      struct fmt_spec fmt = {FMT_F, 8, 2};
-      Oid type = PQftype (res, i);
+      struct fmt_spec fmt = { .type = FMT_F, .w = 8, .d = 2 };
+      Oid type = PQftype (qres, i);
       int width = 0;
-      int length = PQgetlength (res, 0, i);
+      int length ;
+
+      /* If there are no data then make a finger in the air
+        guess at the contents */
+      if (n_tuples > 0)
+       length = PQgetlength (qres, 0, i);
+      else
+       length = PSQL_DEFAULT_WIDTH;
 
       switch (type)
        {
@@ -328,13 +409,13 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
        case BPCHAROID:
          fmt.type = FMT_A;
          width = (info->str_width == -1) ?
-           ROUND_UP (length, MAX_SHORT_STRING) : info->str_width;
+           ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
          fmt.w = width;
          fmt.d = 0;
           break;
        case BYTEAOID:
          fmt.type = FMT_AHEX;
-         width = length > 0 ? length : MAX_SHORT_STRING;
+         width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
          fmt.w = width * 2;
          fmt.d = 0;
          break;
@@ -373,14 +454,35 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
        default:
           msg (MW, _("Unsupported OID %d.  SYSMIS values will be inserted."), type);
          fmt.type = FMT_A;
-         width = length > 0 ? length : MAX_SHORT_STRING;
+         width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
          fmt.w = width ;
          fmt.d = 0;
-
          break;
        }
 
-      var = create_var (r, &fmt, width, PQfname (res, i), i);
+      if (width == 0 && fmt_is_string (fmt.type))
+       fmt.w = width = PSQL_DEFAULT_WIDTH;
+
+
+      var = create_var (r, &fmt, width, PQfname (qres, i), i);
+      if (type == NUMERICOID && n_tuples > 0)
+       {
+         const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
+         struct fmt_spec fmt;
+         int16_t n_digits, weight, dscale;
+         uint16_t sign;
+
+         GET_VALUE (&vptr, n_digits);
+         GET_VALUE (&vptr, weight);
+         GET_VALUE (&vptr, sign);
+         GET_VALUE (&vptr, dscale);
+
+         fmt.d = dscale;
+         fmt.type = FMT_E;
+         fmt.w = fmt_max_output_width (fmt.type) ;
+         fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
+         var_set_both_formats (var, &fmt);
+       }
 
       /* Timezones need an extra variable */
       switch (type)
@@ -416,30 +518,34 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
        default:
          break;
        }
-
     }
 
-  /* Create the first case, and cache it */
-  r->used_first_case = false;
+  PQclear (qres);
 
+  qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
+  if (PQresultStatus (qres) != PGRES_COMMAND_OK)
+    {
+      PQclear (qres);
+      goto error;
+    }
+  PQclear (qres);
 
-  case_create (&r->first_case, r->value_cnt);
-  memset (case_data_rw_idx (&r->first_case, 0)->s,
-         ' ', MAX_SHORT_STRING * r->value_cnt);
+  r->cache_size = info->bsize != -1 ? info->bsize: 4096;
 
-  set_value (r, res, &r->first_case);
+  ds_init_empty (&r->fetch_cmd);
+  ds_put_format (&r->fetch_cmd,  "FETCH FORWARD %d FROM pspp", r->cache_size);
 
-  PQclear (res);
+  reload_cache (r);
+  r->proto = caseproto_ref (dict_get_proto (*dict));
 
   return casereader_create_sequential
     (NULL,
-     r->value_cnt,
-     CASENUMBER_MAX,
+     r->proto,
+     n_cases,
      &psql_casereader_class, r);
 
  error:
-  PQclear (res);
-  dict_destroy (*dict);
+  dict_unref (*dict);
 
   psql_casereader_destroy (NULL, r);
   return NULL;
@@ -453,69 +559,76 @@ psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
   if (r == NULL)
     return ;
 
+  ds_destroy (&r->fetch_cmd);
   free (r->vmap);
+  if (r->res) PQclear (r->res);
   PQfinish (r->conn);
+  caseproto_unref (r->proto);
 
   free (r);
 }
 
-static bool
-psql_casereader_read (struct casereader *reader UNUSED, void *r_,
-                     struct ccase *cc)
-{
-  PGresult *res;
 
+
+static struct ccase *
+psql_casereader_read (struct casereader *reader UNUSED, void *r_)
+{
   struct psql_reader *r = r_;
 
-  if ( !r->used_first_case )
+  if (NULL == r->res || r->tuple >= r->cache_size)
     {
-      *cc = r->first_case;
-      r->used_first_case = true;
-      return true;
+      if (! reload_cache (r))
+       return false;
     }
 
-  case_create (cc, r->value_cnt);
-  memset (case_data_rw_idx (cc, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
+  return set_value (r);
+}
 
-  res = PQexec (r->conn, "FETCH NEXT FROM pspp");
-  if ( PQresultStatus (res) != PGRES_TUPLES_OK || PQntuples (res) < 1)
-    {
-      PQclear (res);
-      case_destroy (cc);
-      return false;
-    }
+static struct ccase *
+set_value (struct psql_reader *r)
+{
+  struct ccase *c;
+  int n_vars;
+  int i;
 
-  set_value (r, res, cc);
+  assert (r->res);
 
-  PQclear (res);
+  n_vars = PQnfields (r->res);
 
-  return true;
-}
+  if (r->tuple >= PQntuples (r->res))
+    return NULL;
 
-static void
-set_value (const struct psql_reader *r,
-          PGresult *res, struct ccase *c)
-{
-  int i;
-  int n_vars = PQnfields (res);
+  c = case_create (r->proto);
+  case_set_missing (c);
 
-  for (i = 0 ; i < n_vars ; ++i )
+
+  for (i = 0 ; i < n_vars ; ++i)
     {
-      Oid type = PQftype (res, i);
-      struct variable *v = dict_get_var (r->dict, r->vmap[i]);
+      Oid type = PQftype (r->res, i);
+      const struct variable *v = r->vmap[i];
       union value *val = case_data_rw (c, v);
-      const struct variable *v1 = NULL;
+
       union value *val1 = NULL;
 
-      if (i < r->vmapsize && r->vmap[i] + 1 < dict_get_var_cnt (r->dict))
+      switch (type)
        {
-         v1 = dict_get_var (r->dict, r->vmap[i] + 1);
+       case INTERVALOID:
+       case TIMESTAMPTZOID:
+       case TIMETZOID:
+         if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_n_vars (r->dict))
+           {
+             const struct variable *v1 = NULL;
+             v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
 
-         val1 = case_data_rw (c, v1);
+             val1 = case_data_rw (c, v1);
+           }
+         break;
+       default:
+         break;
        }
 
 
-      if (PQgetisnull (res, 0, i))
+      if (PQgetisnull (r->res, r->tuple, i))
        {
          value_set_missing (val, var_get_width (v));
 
@@ -532,8 +645,8 @@ set_value (const struct psql_reader *r,
        }
       else
        {
-         const uint8_t *vptr = (const uint8_t *) PQgetvalue (res, 0, i);
-         int length = PQgetlength (res, 0, i);
+         const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
+         int length = PQgetlength (r->res, r->tuple, i);
 
          int var_width = var_get_width (v);
          switch (type)
@@ -589,15 +702,34 @@ set_value (const struct psql_reader *r,
 
            case CASHOID:
              {
-               int32_t x;
-               GET_VALUE (&vptr, x);
-               val->f = x / 100.0;
+               /* Postgres 8.3 uses 64 bits.
+                  Earlier versions use 32 */
+               switch (length)
+                 {
+                 case 8:
+                   {
+                     int64_t x;
+                     GET_VALUE (&vptr, x);
+                     val->f = x / 100.0;
+                   }
+                   break;
+                 case 4:
+                   {
+                     int32_t x;
+                     GET_VALUE (&vptr, x);
+                     val->f = x / 100.0;
+                   }
+                   break;
+                 default:
+                   val->f = SYSMIS;
+                   break;
+                 }
              }
              break;
 
            case INTERVALOID:
              {
-               if ( r->integer_datetimes )
+               if (r->integer_datetimes)
                  {
                    uint32_t months;
                    uint32_t days;
@@ -643,7 +775,7 @@ set_value (const struct psql_reader *r,
 
            case TIMEOID:
              {
-               if ( r->integer_datetimes)
+               if (r->integer_datetimes)
                  {
                    uint64_t x;
                    GET_VALUE (&vptr, x);
@@ -661,7 +793,7 @@ set_value (const struct psql_reader *r,
            case TIMETZOID:
              {
                int32_t zone;
-               if ( r->integer_datetimes)
+               if (r->integer_datetimes)
                  {
                    uint64_t x;
 
@@ -685,7 +817,7 @@ set_value (const struct psql_reader *r,
            case TIMESTAMPOID:
            case TIMESTAMPTZOID:
              {
-               if ( r->integer_datetimes)
+               if (r->integer_datetimes)
                  {
                    int64_t x;
 
@@ -693,7 +825,7 @@ set_value (const struct psql_reader *r,
 
                    x /= 1000000;
 
-                   val->f = (x + r->postgres_epoch * 24 * 3600 );
+                   val->f = (x + r->postgres_epoch * 24 * 3600);
                  }
                else
                  {
@@ -701,7 +833,7 @@ set_value (const struct psql_reader *r,
 
                    GET_VALUE (&vptr, x);
 
-                   val->f = (x + r->postgres_epoch * 24 * 3600 );
+                   val->f = (x + r->postgres_epoch * 24 * 3600);
                  }
              }
              break;
@@ -709,7 +841,7 @@ set_value (const struct psql_reader *r,
            case VARCHAROID:
            case BPCHAROID:
            case BYTEAOID:
-             memcpy (val->s, (char *) vptr, MIN (length, var_width));
+             memcpy (val->s, vptr, MIN (length, var_width));
              break;
 
            case NUMERICOID:
@@ -724,6 +856,7 @@ set_value (const struct psql_reader *r,
                GET_VALUE (&vptr, sign);
                GET_VALUE (&vptr, dscale);
 
+#if 0
                {
                  struct fmt_spec fmt;
                  fmt.d = dscale;
@@ -732,6 +865,7 @@ set_value (const struct psql_reader *r,
                  fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
                  var_set_both_formats (v, &fmt);
                }
+#endif
 
                for (i = 0 ; i < n_digits;  ++i)
                  {
@@ -740,10 +874,10 @@ set_value (const struct psql_reader *r,
                    f += x * pow (10000, weight--);
                  }
 
-               if ( sign == 0x4000)
+               if (sign == 0x4000)
                  f *= -1.0;
 
-               if ( sign == 0xC000)
+               if (sign == 0xC000)
                  val->f = SYSMIS;
                else
                  val->f = f;
@@ -756,6 +890,10 @@ set_value (const struct psql_reader *r,
            }
        }
     }
+
+  r->tuple++;
+
+  return c;
 }
 
 #endif