treewide: Replace <name>_cnt by n_<name>s and <name>_cap by allocated_<name>.
[pspp] / src / data / psql-reader.c
index 346d214ad31e6d43050dccba941979390cb5cf33..ae56d18212b5eb86873835e6e8dc150285f544b6 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2008, 2009, 2010, 2011 Free Software Foundation, Inc.
+   Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
 #include "data/dictionary.h"
 #include "data/format.h"
 #include "data/variable.h"
+#include "libpspp/i18n.h"
 #include "libpspp/message.h"
 #include "libpspp/misc.h"
 #include "libpspp/str.h"
 
-#include "gl/xalloc.h"
+#include "gl/c-strcase.h"
 #include "gl/minmax.h"
+#include "gl/xalloc.h"
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
@@ -126,7 +128,7 @@ data_to_native (const void *in_, void *out_, int len)
   int i;
   const unsigned char *in = in_;
   unsigned char *out = out_;
-  for (i = 0 ; i < len ; ++i )
+  for (i = 0 ; i < len ; ++i)
     out[i] = in[i];
 }
 #else
@@ -136,7 +138,7 @@ data_to_native (const void *in_, void *out_, int len)
   int i;
   const unsigned char *in = in_;
   unsigned char *out = out_;
-  for (i = 0 ; i < len ; ++i )
+  for (i = 0 ; i < len ; ++i)
     out[len - i - 1] = in[i];
 }
 #endif
@@ -164,7 +166,7 @@ dump (const unsigned char *x, int l)
 
   for (i = 0; i < l ; ++i)
     {
-      if ( isprint (x[i]))
+      if (isprint (x[i]))
        printf ("%c ", x[i]);
       else
        printf ("   ");
@@ -188,7 +190,7 @@ create_var (struct psql_reader *r, const struct fmt_spec *fmt,
 
   var_set_both_formats (var, fmt);
 
-  if ( col != -1)
+  if (col != -1)
     {
       r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
 
@@ -229,18 +231,19 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
   int n_fields, n_tuples;
   PGresult *qres = NULL;
   casenumber n_cases = CASENUMBER_MAX;
+  const char *encoding;
 
-  struct psql_reader *r = xzalloc (sizeof *r);
+  struct psql_reader *r = XZALLOC (struct psql_reader);
   struct string query ;
 
   r->conn = PQconnectdb (info->conninfo);
-  if ( NULL == r->conn)
+  if (NULL == r->conn)
     {
       msg (ME, _("Memory error whilst opening psql source"));
       goto error;
     }
 
-  if ( PQstatus (r->conn) != CONNECTION_OK )
+  if (PQstatus (r->conn) != CONNECTION_OK)
     {
       msg (ME, _("Error opening psql source: %s."),
           PQerrorMessage (r->conn));
@@ -249,12 +252,12 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
     }
 
   {
-    int ver_num;
+    int ver_num = 0;
     const char *vers = PQparameterStatus (r->conn, "server_version");
 
     sscanf (vers, "%d", &ver_num);
 
-    if ( ver_num < 8)
+    if (ver_num < 8)
       {
        msg (ME,
             _("Postgres server is version %s."
@@ -268,11 +271,11 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
   {
     const char *dt =  PQparameterStatus (r->conn, "integer_datetimes");
 
-    r->integer_datetimes = ( 0 == strcasecmp (dt, "on"));
+    r->integer_datetimes = (0 == c_strcasecmp (dt, "on"));
   }
 
 #if USE_SSL
-  if ( PQgetssl (r->conn) == NULL)
+  if (PQgetssl (r->conn) == NULL)
 #endif
     {
       if (! info->allow_clear)
@@ -283,11 +286,8 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
        }
     }
 
-  r->postgres_epoch = calendar_gregorian_to_offset (2000, 1, 1, NULL);
-
-
-  /* Create the dictionary and populate it */
-  *dict = r->dict = dict_create ();
+  r->postgres_epoch = calendar_gregorian_to_offset (
+    2000, 1, 1, settings_get_fmt_settings (), NULL);
 
   {
     const int enc = PQclientEncoding (r->conn);
@@ -295,25 +295,32 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
     /* According to section 22.2 of the Postgresql manual
        a value of zero (SQL_ASCII) indicates
        "a declaration of ignorance about the encoding".
-       Accordingly, we don't set the dictionary's encoding
+       Accordingly, we use the default encoding
        if we find this value.
     */
-    if ( enc != 0 )
-      dict_set_encoding (r->dict, pg_encoding_to_char (enc));
+    encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding ();
+
+    /* Create the dictionary and populate it */
+    *dict = r->dict = dict_create (encoding);
   }
 
+  const int version = PQserverVersion (r->conn);
+  ds_init_empty (&query);
   /*
-    select count (*) from (select * from medium) stupid_sql_standard;
-  */
-  ds_init_cstr (&query,
-               "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
-               "DECLARE  pspp BINARY CURSOR FOR ");
+    Versions before 9.1 don't have the REPEATABLE READ isolation level.
+    However according to <a12321aabb@gmail.com> if the server is in the
+    "hot standby" mode then SERIALIZABLE won't work.
+   */
+  ds_put_c_format (&query,
+                  "BEGIN READ ONLY ISOLATION LEVEL %s; "
+                  "DECLARE  pspp BINARY CURSOR FOR ",
+                  (version < 90100) ? "SERIALIZABLE" : "REPEATABLE READ");
 
   ds_put_substring (&query, info->sql.ss);
 
   qres = PQexec (r->conn, ds_cstr (&query));
   ds_destroy (&query);
-  if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
+  if (PQresultStatus (qres) != PGRES_COMMAND_OK)
     {
       msg (ME, _("Error from psql source: %s."),
           PQresultErrorMessage (qres));
@@ -338,7 +345,7 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
 
   qres = PQexec (r->conn, ds_cstr (&query));
   ds_destroy (&query);
-  if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
+  if (PQresultStatus (qres) != PGRES_TUPLES_OK)
     {
       msg (ME, _("Error from psql source: %s."),
           PQresultErrorMessage (qres));
@@ -348,7 +355,7 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
   PQclear (qres);
 
   qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
-  if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
+  if (PQresultStatus (qres) != PGRES_TUPLES_OK)
     {
       msg (ME, _("Error from psql source: %s."),
           PQresultErrorMessage (qres));
@@ -362,19 +369,19 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
   r->vmap = NULL;
   r->vmapsize = 0;
 
-  for (i = 0 ; i < n_fields ; ++i )
+  for (i = 0 ; i < n_fields ; ++i)
     {
       struct variable *var;
-      struct fmt_spec fmt = {FMT_F, 8, 2};
+      struct fmt_spec fmt = { .type = FMT_F, .w = 8, .d = 2 };
       Oid type = PQftype (qres, i);
       int width = 0;
       int length ;
 
-      /* If there are no data then make a finger in the air 
+      /* If there are no data then make a finger in the air
         guess at the contents */
-      if ( n_tuples > 0 )
+      if (n_tuples > 0)
        length = PQgetlength (qres, 0, i);
-      else 
+      else
        length = PSQL_DEFAULT_WIDTH;
 
       switch (type)
@@ -453,12 +460,12 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
          break;
        }
 
-      if ( width == 0 && fmt_is_string (fmt.type))
+      if (width == 0 && fmt_is_string (fmt.type))
        fmt.w = width = PSQL_DEFAULT_WIDTH;
 
 
       var = create_var (r, &fmt, width, PQfname (qres, i), i);
-      if ( type == NUMERICOID && n_tuples > 0)
+      if (type == NUMERICOID && n_tuples > 0)
        {
          const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
          struct fmt_spec fmt;
@@ -516,7 +523,7 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
   PQclear (qres);
 
   qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
-  if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
+  if (PQresultStatus (qres) != PGRES_COMMAND_OK)
     {
       PQclear (qres);
       goto error;
@@ -538,7 +545,7 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
      &psql_casereader_class, r);
 
  error:
-  dict_destroy (*dict);
+  dict_unref (*dict);
 
   psql_casereader_destroy (NULL, r);
   return NULL;
@@ -568,9 +575,9 @@ psql_casereader_read (struct casereader *reader UNUSED, void *r_)
 {
   struct psql_reader *r = r_;
 
-  if ( NULL == r->res || r->tuple >= r->cache_size)
+  if (NULL == r->res || r->tuple >= r->cache_size)
     {
-      if ( ! reload_cache (r) )
+      if (! reload_cache (r))
        return false;
     }
 
@@ -588,14 +595,14 @@ set_value (struct psql_reader *r)
 
   n_vars = PQnfields (r->res);
 
-  if ( r->tuple >= PQntuples (r->res))
+  if (r->tuple >= PQntuples (r->res))
     return NULL;
 
   c = case_create (r->proto);
   case_set_missing (c);
 
 
-  for (i = 0 ; i < n_vars ; ++i )
+  for (i = 0 ; i < n_vars ; ++i)
     {
       Oid type = PQftype (r->res, i);
       const struct variable *v = r->vmap[i];
@@ -608,7 +615,7 @@ set_value (struct psql_reader *r)
        case INTERVALOID:
        case TIMESTAMPTZOID:
        case TIMETZOID:
-         if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
+         if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_n_vars (r->dict))
            {
              const struct variable *v1 = NULL;
              v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
@@ -722,7 +729,7 @@ set_value (struct psql_reader *r)
 
            case INTERVALOID:
              {
-               if ( r->integer_datetimes )
+               if (r->integer_datetimes)
                  {
                    uint32_t months;
                    uint32_t days;
@@ -768,7 +775,7 @@ set_value (struct psql_reader *r)
 
            case TIMEOID:
              {
-               if ( r->integer_datetimes)
+               if (r->integer_datetimes)
                  {
                    uint64_t x;
                    GET_VALUE (&vptr, x);
@@ -786,7 +793,7 @@ set_value (struct psql_reader *r)
            case TIMETZOID:
              {
                int32_t zone;
-               if ( r->integer_datetimes)
+               if (r->integer_datetimes)
                  {
                    uint64_t x;
 
@@ -810,7 +817,7 @@ set_value (struct psql_reader *r)
            case TIMESTAMPOID:
            case TIMESTAMPTZOID:
              {
-               if ( r->integer_datetimes)
+               if (r->integer_datetimes)
                  {
                    int64_t x;
 
@@ -818,7 +825,7 @@ set_value (struct psql_reader *r)
 
                    x /= 1000000;
 
-                   val->f = (x + r->postgres_epoch * 24 * 3600 );
+                   val->f = (x + r->postgres_epoch * 24 * 3600);
                  }
                else
                  {
@@ -826,7 +833,7 @@ set_value (struct psql_reader *r)
 
                    GET_VALUE (&vptr, x);
 
-                   val->f = (x + r->postgres_epoch * 24 * 3600 );
+                   val->f = (x + r->postgres_epoch * 24 * 3600);
                  }
              }
              break;
@@ -834,8 +841,7 @@ set_value (struct psql_reader *r)
            case VARCHAROID:
            case BPCHAROID:
            case BYTEAOID:
-             memcpy (value_str_rw (val, var_width), vptr,
-                      MIN (length, var_width));
+             memcpy (val->s, vptr, MIN (length, var_width));
              break;
 
            case NUMERICOID:
@@ -868,10 +874,10 @@ set_value (struct psql_reader *r)
                    f += x * pow (10000, weight--);
                  }
 
-               if ( sign == 0x4000)
+               if (sign == 0x4000)
                  f *= -1.0;
 
-               if ( sign == 0xC000)
+               if (sign == 0xC000)
                  val->f = SYSMIS;
                else
                  val->f = f;