Optimise the psql reader, by fetching more than one record at a time.
authorJohn Darrington <john@darrington.wattle.id.au>
Wed, 6 Feb 2008 02:08:18 +0000 (02:08 +0000)
committerJohn Darrington <john@darrington.wattle.id.au>
Wed, 6 Feb 2008 02:08:18 +0000 (02:08 +0000)
Fix a few bugs.  Ask the server to send the number of records when the
casereader is created.

doc/ChangeLog
doc/files.texi
src/data/ChangeLog
src/data/psql-reader.c
src/data/psql-reader.h
src/language/data-io/ChangeLog
src/language/data-io/get-data.c
tests/command/get-data-psql.sh

index 021e234345832587d5b76ce3bdbe4aa0453d0aae..58c8b3335fb3c5bea8491175215b429dfae4cf05 100644 (file)
@@ -1,3 +1,8 @@
+2008-02-06 John Darrington <john@darrington.wattle.id.au>
+
+       * files.texi: Document the /BSIZE subcommand to the PSQL
+       reader.
+
 2008-02-04 John Darrington <john@darrington.wattle.id.au>
 
        * files.texi data-io.texi: Document the GET DATA TYPE=PSQL
 2008-02-04 John Darrington <john@darrington.wattle.id.au>
 
        * files.texi data-io.texi: Document the GET DATA TYPE=PSQL
index df01567eb956efd64df16ca7feff2e4fba3e93e9..ded40e7b9222aea362f9ee5b32f01ab4ec6cd24c 100644 (file)
@@ -265,7 +265,8 @@ GET DATA /TYPE=PSQL
          /CONNECT=@{connection info@}
          /SQL=@{query@}
          [/ASSUMEDVARWIDTH=n]
          /CONNECT=@{connection info@}
          /SQL=@{query@}
          [/ASSUMEDVARWIDTH=n]
-         [/UNENCRYPTED].
+         [/UNENCRYPTED]
+         [/BSIZE=n].
 @end display
 
 @cindex postgres
 @end display
 
 @cindex postgres
@@ -303,6 +304,17 @@ Whether or not the connection is
 encrypted depends upon the underlying psql library and the 
 capabilities of the database server.
 
 encrypted depends upon the underlying psql library and the 
 capabilities of the database server.
 
+The BSIZE subcommand serves only to optimise the speed of data transfer.
+It specifies an upper limit on
+number of cases to fetch from the database at once.
+The default value is 4096.
+If your SQL statement fetches a large number of cases but only a small number of
+variables, then the data transfer may be faster if you increase this value.
+Conversely, if the number of variables is large, or if the machine on which 
+PSPP is running has only a
+small amount of memory, then a smaller value will be better.
+
+
 The following syntax is an example:
 @example
 GET DATA /TYPE=PSQL
 The following syntax is an example:
 @example
 GET DATA /TYPE=PSQL
index 6ab0bb443ab53d862ba64eb50208bfb8ac4e7bf7..b3980052ae156f97a417e78fcc90cb41ec4bddcd 100644 (file)
@@ -1,3 +1,11 @@
+2008-02-06  John Darrington <john@darrington.wattle.id.au>
+
+       psql-reader.c psql-reader.h: Read more than one tuple at
+       once.  Fix bug reading a query which returns no data. Fix bug
+       when transformation followed a reader.
+       Ask the server for the number of records in the query, for the
+       benefit of the gui.
+
 2008-02-05  John Darrington <john@darrington.wattle.id.au>
 
        psql-reader.c: So yesterday they release postgresql 8.3.0
 2008-02-05  John Darrington <john@darrington.wattle.id.au>
 
        psql-reader.c: So yesterday they release postgresql 8.3.0
index f59bcb6dac8521cba01d026ca5cf1a54a524d367..72e14be4fce9542eb8f7c92b6a21563fc2842f0c 100644 (file)
@@ -28,6 +28,7 @@
 #include "calendar.h"
 
 #include <inttypes.h>
 #include "calendar.h"
 
 #include <inttypes.h>
+#include <libpspp/str.h>
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
@@ -88,6 +89,8 @@ static const struct casereader_class psql_casereader_class =
 struct psql_reader
 {
   PGconn *conn;
 struct psql_reader
 {
   PGconn *conn;
+  PGresult *res;
+  int tuple;
 
   bool integer_datetimes;
 
 
   bool integer_datetimes;
 
@@ -96,18 +99,18 @@ struct psql_reader
   size_t value_cnt;
   struct dictionary *dict;
 
   size_t value_cnt;
   struct dictionary *dict;
 
-  bool used_first_case;
-  struct ccase first_case;
-
   /* An array of ints, which maps psql column numbers into
   /* An array of ints, which maps psql column numbers into
-     pspp variable numbers */
-  int *vmap;
+     pspp variables */
+  struct variable **vmap;
   size_t vmapsize;
   size_t vmapsize;
+
+  struct string fetch_cmd;
+  int cache_size;
 };
 
 
 };
 
 
-static void set_value (const struct psql_reader *r,
-                      PGresult *res, struct ccase *c);
+static bool set_value (struct psql_reader *r,
+                      struct ccase *cc);
 
 
 
 
 
 
@@ -171,7 +174,6 @@ create_var (struct psql_reader *r, const struct fmt_spec *fmt,
            int width, const char *suggested_name, int col)
 {
   unsigned long int vx = 0;
            int width, const char *suggested_name, int col)
 {
   unsigned long int vx = 0;
-  int vidx;
   struct variable *var;
   char name[VAR_NAME_LEN + 1];
 
   struct variable *var;
   char name[VAR_NAME_LEN + 1];
 
@@ -186,30 +188,51 @@ create_var (struct psql_reader *r, const struct fmt_spec *fmt,
   var = dict_create_var (r->dict, name, width);
   var_set_both_formats (var, fmt);
 
   var = dict_create_var (r->dict, name, width);
   var_set_both_formats (var, fmt);
 
-  vidx = var_get_dict_index (var);
-
   if ( col != -1)
     {
   if ( col != -1)
     {
-      r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (int));
+      r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
 
 
-      r->vmap[col] = vidx;
+      r->vmap[col] = var;
       r->vmapsize = col + 1;
     }
 
   return var;
 }
 
       r->vmapsize = col + 1;
     }
 
   return var;
 }
 
+
+
+
+/* Fill the cache */
+static bool
+reload_cache (struct psql_reader *r)
+{
+  PQclear (r->res);
+  r->tuple = 0;
+
+  r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
+
+  if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
+    {
+      PQclear (r->res);
+      r->res = NULL;
+      return false;
+    }
+
+  return true;
+}
+
+
 struct casereader *
 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
 {
   int i;
 struct casereader *
 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
 {
   int i;
-  int n_fields;
-  PGresult *res = NULL;
+  int n_fields, n_tuples;
+  PGresult *qres = NULL;
+  casenumber n_cases = CASENUMBER_MAX;
 
   struct psql_reader *r = xzalloc (sizeof *r);
   struct string query ;
 
 
   struct psql_reader *r = xzalloc (sizeof *r);
   struct string query ;
 
-
   r->conn = PQconnectdb (info->conninfo);
   if ( NULL == r->conn)
     {
   r->conn = PQconnectdb (info->conninfo);
   if ( NULL == r->conn)
     {
@@ -226,12 +249,12 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
     }
 
   {
     }
 
   {
-    int v1;
+    int ver_num;
     const char *vers = PQparameterStatus (r->conn, "server_version");
 
     const char *vers = PQparameterStatus (r->conn, "server_version");
 
-    sscanf (vers, "%d", &v1);
+    sscanf (vers, "%d", &ver_num);
 
 
-    if ( v1 < 8)
+    if ( ver_num < 8)
       {
        msg (ME,
             _("Postgres server is version %s."
       {
        msg (ME,
             _("Postgres server is version %s."
@@ -267,29 +290,62 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
   /* Create the dictionary and populate it */
   *dict = r->dict = dict_create ();
 
   /* Create the dictionary and populate it */
   *dict = r->dict = dict_create ();
 
-  ds_init_cstr (&query, "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; DECLARE  pspp BINARY CURSOR FOR ");
+  /*
+    select count (*) from (select * from medium) stupid_sql_standard;
+  */
+
+  ds_init_cstr (&query,
+               "BEGIN READ ONLY ISOLATION LEVEL SERIALIZABLE; "
+               "DECLARE  pspp BINARY CURSOR FOR ");
+
   ds_put_substring (&query, info->sql.ss);
 
   ds_put_substring (&query, info->sql.ss);
 
-  res = PQexec (r->conn, ds_cstr (&query));
+  qres = PQexec (r->conn, ds_cstr (&query));
   ds_destroy (&query);
   ds_destroy (&query);
-  if ( PQresultStatus (res) != PGRES_COMMAND_OK )
+  if ( PQresultStatus (qres) != PGRES_COMMAND_OK )
     {
       msg (ME, _("Error from psql source: %s."),
     {
       msg (ME, _("Error from psql source: %s."),
-          PQresultErrorMessage (res));
+          PQresultErrorMessage (qres));
       goto error;
     }
 
       goto error;
     }
 
-  PQclear (res);
+  PQclear (qres);
+
+
+  /* Now use the count() function to find the total number of cases
+     that this query returns.
+     Doing this incurs some overhead.  The server has to iterate every
+     case in order to find this number.  However, it's performed on the
+     server side, and in all except the most huge databases the extra
+     overhead will be worth the effort.
+     On the other hand, most PSPP functions don't need to know this.
+     The GUI is the notable exception.
+  */
+  ds_init_cstr (&query, "SELECT count (*) FROM (");
+  ds_put_substring (&query, info->sql.ss);
+  ds_put_cstr (&query, ") stupid_sql_standard");
+
+  qres = PQexec (r->conn, ds_cstr (&query));
+  ds_destroy (&query);
+  if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
+    {
+      msg (ME, _("Error from psql source: %s."),
+          PQresultErrorMessage (qres));
+      goto error;
+    }
+  n_cases = atol (PQgetvalue (qres, 0, 0));
+  PQclear (qres);
 
 
-  res = PQexec (r->conn, "FETCH FIRST FROM pspp");
-  if ( PQresultStatus (res) != PGRES_TUPLES_OK )
+  qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
+  if ( PQresultStatus (qres) != PGRES_TUPLES_OK )
     {
       msg (ME, _("Error from psql source: %s."),
     {
       msg (ME, _("Error from psql source: %s."),
-          PQresultErrorMessage (res));
+          PQresultErrorMessage (qres));
       goto error;
     }
 
       goto error;
     }
 
-  n_fields = PQnfields (res);
+  n_tuples = PQntuples (qres);
+  n_fields = PQnfields (qres);
 
   r->value_cnt = 0;
   r->vmap = NULL;
 
   r->value_cnt = 0;
   r->vmap = NULL;
@@ -299,9 +355,16 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
     {
       struct variable *var;
       struct fmt_spec fmt = {FMT_F, 8, 2};
     {
       struct variable *var;
       struct fmt_spec fmt = {FMT_F, 8, 2};
-      Oid type = PQftype (res, i);
+      Oid type = PQftype (qres, i);
       int width = 0;
       int width = 0;
-      int length = PQgetlength (res, 0, i);
+      int length ;
+
+      /* If there are no data then make a finger in the air 
+        guess at the contents */
+      if ( n_tuples > 0 )
+       length = PQgetlength (qres, 0, i);
+      else 
+       length = MAX_SHORT_STRING;
 
       switch (type)
        {
 
       switch (type)
        {
@@ -376,11 +439,28 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
          width = length > 0 ? length : MAX_SHORT_STRING;
          fmt.w = width ;
          fmt.d = 0;
          width = length > 0 ? length : MAX_SHORT_STRING;
          fmt.w = width ;
          fmt.d = 0;
-
          break;
        }
 
          break;
        }
 
-      var = create_var (r, &fmt, width, PQfname (res, i), i);
+      var = create_var (r, &fmt, width, PQfname (qres, i), i);
+      if ( type == NUMERICOID && n_tuples > 0)
+       {
+         const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
+         struct fmt_spec fmt;
+         int16_t n_digits, weight, dscale;
+         uint16_t sign;
+
+         GET_VALUE (&vptr, n_digits);
+         GET_VALUE (&vptr, weight);
+         GET_VALUE (&vptr, sign);
+         GET_VALUE (&vptr, dscale);
+
+         fmt.d = dscale;
+         fmt.type = FMT_E;
+         fmt.w = fmt_max_output_width (fmt.type) ;
+         fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
+         var_set_both_formats (var, &fmt);
+       }
 
       /* Timezones need an extra variable */
       switch (type)
 
       /* Timezones need an extra variable */
       switch (type)
@@ -416,29 +496,32 @@ psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
        default:
          break;
        }
        default:
          break;
        }
-
     }
 
     }
 
-  /* Create the first case, and cache it */
-  r->used_first_case = false;
+  PQclear (qres);
 
 
+  qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
+  if ( PQresultStatus (qres) != PGRES_COMMAND_OK)
+    {
+      PQclear (qres);
+      goto error;
+    }
+  PQclear (qres);
 
 
-  case_create (&r->first_case, r->value_cnt);
-  memset (case_data_rw_idx (&r->first_case, 0)->s,
-         ' ', MAX_SHORT_STRING * r->value_cnt);
+  r->cache_size = info->bsize != -1 ? info->bsize: 4096;
 
 
-  set_value (r, res, &r->first_case);
+  ds_init_empty (&r->fetch_cmd);
+  ds_put_format (&r->fetch_cmd,  "FETCH FORWARD %d FROM pspp", r->cache_size);
 
 
-  PQclear (res);
+  reload_cache (r);
 
   return casereader_create_sequential
     (NULL,
      r->value_cnt,
 
   return casereader_create_sequential
     (NULL,
      r->value_cnt,
-     CASENUMBER_MAX,
+     n_cases,
      &psql_casereader_class, r);
 
  error:
      &psql_casereader_class, r);
 
  error:
-  PQclear (res);
   dict_destroy (*dict);
 
   psql_casereader_destroy (NULL, r);
   dict_destroy (*dict);
 
   psql_casereader_destroy (NULL, r);
@@ -453,69 +536,76 @@ psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
   if (r == NULL)
     return ;
 
   if (r == NULL)
     return ;
 
+  ds_destroy (&r->fetch_cmd);
   free (r->vmap);
   free (r->vmap);
+  if (r->res) PQclear (r->res);
   PQfinish (r->conn);
 
   free (r);
 }
 
   PQfinish (r->conn);
 
   free (r);
 }
 
+
+
 static bool
 psql_casereader_read (struct casereader *reader UNUSED, void *r_,
                      struct ccase *cc)
 {
 static bool
 psql_casereader_read (struct casereader *reader UNUSED, void *r_,
                      struct ccase *cc)
 {
-  PGresult *res;
-
   struct psql_reader *r = r_;
 
   struct psql_reader *r = r_;
 
-  if ( !r->used_first_case )
+  if ( NULL == r->res || r->tuple >= r->cache_size)
     {
     {
-      *cc = r->first_case;
-      r->used_first_case = true;
-      return true;
+      if ( ! reload_cache (r) )
+       return false;
     }
 
     }
 
-  case_create (cc, r->value_cnt);
-  memset (case_data_rw_idx (cc, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
+  return set_value (r, cc);
+}
 
 
-  res = PQexec (r->conn, "FETCH NEXT FROM pspp");
-  if ( PQresultStatus (res) != PGRES_TUPLES_OK || PQntuples (res) < 1)
-    {
-      PQclear (res);
-      case_destroy (cc);
-      return false;
-    }
+static bool
+set_value (struct psql_reader *r,
+          struct ccase *c)
+{
+  int i;
+  int n_vars;
 
 
-  set_value (r, res, cc);
+  assert (r->res);
 
 
-  PQclear (res);
+  n_vars = PQnfields (r->res);
 
 
-  return true;
-}
+  if ( r->tuple >= PQntuples (r->res))
+    return false;
+
+  case_create (c, r->value_cnt);
+  memset (case_data_rw_idx (c, 0)->s, ' ', MAX_SHORT_STRING * r->value_cnt);
 
 
-static void
-set_value (const struct psql_reader *r,
-          PGresult *res, struct ccase *c)
-{
-  int i;
-  int n_vars = PQnfields (res);
 
   for (i = 0 ; i < n_vars ; ++i )
     {
 
   for (i = 0 ; i < n_vars ; ++i )
     {
-      Oid type = PQftype (res, i);
-      struct variable *v = dict_get_var (r->dict, r->vmap[i]);
+      Oid type = PQftype (r->res, i);
+      const struct variable *v = r->vmap[i];
       union value *val = case_data_rw (c, v);
       union value *val = case_data_rw (c, v);
-      const struct variable *v1 = NULL;
+
       union value *val1 = NULL;
 
       union value *val1 = NULL;
 
-      if (i < r->vmapsize && r->vmap[i] + 1 < dict_get_var_cnt (r->dict))
+      switch (type)
        {
        {
-         v1 = dict_get_var (r->dict, r->vmap[i] + 1);
+       case INTERVALOID:
+       case TIMESTAMPTZOID:
+       case TIMETZOID:
+         if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_var_cnt (r->dict))
+           {
+             const struct variable *v1 = NULL;
+             v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
 
 
-         val1 = case_data_rw (c, v1);
+             val1 = case_data_rw (c, v1);
+           }
+         break;
+       default:
+         break;
        }
 
 
        }
 
 
-      if (PQgetisnull (res, 0, i))
+      if (PQgetisnull (r->res, r->tuple, i))
        {
          value_set_missing (val, var_get_width (v));
 
        {
          value_set_missing (val, var_get_width (v));
 
@@ -532,8 +622,8 @@ set_value (const struct psql_reader *r,
        }
       else
        {
        }
       else
        {
-         const uint8_t *vptr = (const uint8_t *) PQgetvalue (res, 0, i);
-         int length = PQgetlength (res, 0, i);
+         const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
+         int length = PQgetlength (r->res, r->tuple, i);
 
          int var_width = var_get_width (v);
          switch (type)
 
          int var_width = var_get_width (v);
          switch (type)
@@ -743,6 +833,7 @@ set_value (const struct psql_reader *r,
                GET_VALUE (&vptr, sign);
                GET_VALUE (&vptr, dscale);
 
                GET_VALUE (&vptr, sign);
                GET_VALUE (&vptr, dscale);
 
+#if 0
                {
                  struct fmt_spec fmt;
                  fmt.d = dscale;
                {
                  struct fmt_spec fmt;
                  fmt.d = dscale;
@@ -751,6 +842,7 @@ set_value (const struct psql_reader *r,
                  fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
                  var_set_both_formats (v, &fmt);
                }
                  fmt.d =  MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
                  var_set_both_formats (v, &fmt);
                }
+#endif
 
                for (i = 0 ; i < n_digits;  ++i)
                  {
 
                for (i = 0 ; i < n_digits;  ++i)
                  {
@@ -775,6 +867,10 @@ set_value (const struct psql_reader *r,
            }
        }
     }
            }
        }
     }
+
+  r->tuple++;
+
+  return true;
 }
 
 #endif
 }
 
 #endif
index bd83accb73f1c69ec2be870556b8f4e8a65d2b50..2811daa5f7bcacab52e7520a7fb8901b3eb66253 100644 (file)
@@ -28,6 +28,7 @@ struct psql_read_info
   struct string sql;
   bool allow_clear;
   int str_width;
   struct string sql;
   bool allow_clear;
   int str_width;
+  int bsize;
 };
 
 struct dictionary;
 };
 
 struct dictionary;
index 187d6464bab0e2db6f33ccfc960fe05338e46777..21759bbb0946bc1d72e995559816b741cb07fcec 100644 (file)
@@ -1,3 +1,7 @@
+2008-02-06 John Darrington <john@darrington.wattle.id.au>
+
+       * get-data.c: Add a /BSIZE subcommand to PSQL reader.
+
 2008-02-02 John Darrington <john@darrington.wattle.id.au>
 
        * get-data.c (cmd_get_data): Support PSQL type.
 2008-02-02 John Darrington <john@darrington.wattle.id.au>
 
        * get-data.c (cmd_get_data): Support PSQL type.
index 189054a507058c38b17b1ba027d0f69b5a995613..bc09715bd81bdf056d3f5e382933ee79a4a8a32a 100644 (file)
@@ -69,6 +69,7 @@ parse_get_psql (struct lexer *lexer, struct dataset *ds)
   psql.allow_clear = false;
   psql.conninfo = NULL;
   psql.str_width = -1;
   psql.allow_clear = false;
   psql.conninfo = NULL;
   psql.str_width = -1;
+  psql.bsize = -1;
   ds_init_empty (&psql.sql);
 
   lex_force_match (lexer, '/');
   ds_init_empty (&psql.sql);
 
   lex_force_match (lexer, '/');
@@ -93,6 +94,12 @@ parse_get_psql (struct lexer *lexer, struct dataset *ds)
          psql.str_width = lex_integer (lexer);
          lex_get (lexer);
        }
          psql.str_width = lex_integer (lexer);
          lex_get (lexer);
        }
+      else if ( lex_match_id (lexer, "BSIZE"))
+       {
+         lex_match (lexer, '=');
+         psql.bsize = lex_integer (lexer);
+         lex_get (lexer);
+       }
       else if ( lex_match_id (lexer, "UNENCRYPTED"))
        {
          psql.allow_clear = true;
       else if ( lex_match_id (lexer, "UNENCRYPTED"))
        {
          psql.allow_clear = true;
index 6d099a06da56146b8247d41625a37a3f982c968c..6b501025b8b6bd338fabd98bf55b4c1d5ea886e1 100755 (executable)
@@ -30,7 +30,7 @@ cleanup()
        echo "NOT cleaning $TEMPDIR"
        return ; 
     fi
        echo "NOT cleaning $TEMPDIR"
        return ; 
     fi
-    PGHOST=$TEMPDIR $pgpath/pg_ctl -D $TEMPDIR/cluster  stop -w -o "-k $TEMPDIR -h ''"   > /dev/null 2>&1
+    PGHOST=$TEMPDIR $pgpath/pg_ctl -D $TEMPDIR/cluster  stop -W -o "-k $TEMPDIR -h ''"   > /dev/null 2>&1
     rm -rf $TEMPDIR
 }
 
     rm -rf $TEMPDIR
 }
 
@@ -72,9 +72,8 @@ activity="create cluster"
 $pgpath/initdb  -D $TEMPDIR/cluster -A trust > /dev/null
 if [ $? -ne 0 ] ; then no_result ; fi
 
 $pgpath/initdb  -D $TEMPDIR/cluster -A trust > /dev/null
 if [ $? -ne 0 ] ; then no_result ; fi
 
-
 activity="run server"
 activity="run server"
-PGHOST=$TEMPDIR PGPORT=$port $pgpath/pg_ctl -D $TEMPDIR/cluster  start -w -o "-k $TEMPDIR" > /dev/null
+PGHOST=$TEMPDIR PGPORT=$port $pgpath/pg_ctl -D $TEMPDIR/cluster  start -w -o "-k $TEMPDIR -h ''" > /dev/null
 if [ $? -ne 0 ] ; then no_result ; fi
 
 
 if [ $? -ne 0 ] ; then no_result ; fi
 
 
@@ -84,6 +83,14 @@ createdb  -h $TEMPDIR  -p $port $dbase > /dev/null 2> /dev/null
 
 activity="populate database"
 psql  -h $TEMPDIR -p $port  $dbase > /dev/null << EOF
 
 activity="populate database"
 psql  -h $TEMPDIR -p $port  $dbase > /dev/null << EOF
+
+CREATE TABLE empty (a int, b date, c numeric(23, 4));
+
+-- a largeish table to check big queries work ok.
+CREATE TABLE large (x int);
+INSERT INTO large  (select * from generate_series(1, 1000));
+
+
 CREATE TABLE thing (
  bool    bool                      ,
  bytea   bytea                     ,
 CREATE TABLE thing (
  bool    bool                      ,
  bytea   bytea                     ,
@@ -178,7 +185,7 @@ INSERT INTO thing VALUES (
 EOF
 if [ $? -ne 0 ] ; then fail ; fi
 
 EOF
 if [ $? -ne 0 ] ; then fail ; fi
 
-activity="create program"
+activity="create program 1"
 cat > $TESTFILE <<EOF
 GET DATA /TYPE=psql 
        /CONNECT="host=$TEMPDIR port=$port dbname=$dbase"
 cat > $TESTFILE <<EOF
 GET DATA /TYPE=psql 
        /CONNECT="host=$TEMPDIR port=$port dbname=$dbase"
@@ -192,11 +199,11 @@ EOF
 if [ $? -ne 0 ] ; then no_result ; fi
 
 
 if [ $? -ne 0 ] ; then no_result ; fi
 
 
-activity="run program"
+activity="run program 1"
 $SUPERVISOR $PSPP --testing-mode -o raw-ascii $TESTFILE
 if [ $? -ne 0 ] ; then no_result ; fi
 
 $SUPERVISOR $PSPP --testing-mode -o raw-ascii $TESTFILE
 if [ $? -ne 0 ] ; then no_result ; fi
 
-activity="compare output"
+activity="compare output 1"
 perl -pi -e 's/^\s*$//g' $TEMPDIR/pspp.list
 diff -b  $TEMPDIR/pspp.list - << 'EOF'
 1.1 DISPLAY.  
 perl -pi -e 's/^\s*$//g' $TEMPDIR/pspp.list
 diff -b  $TEMPDIR/pspp.list - << 'EOF'
 1.1 DISPLAY.  
@@ -321,4 +328,101 @@ diff -b  $TEMPDIR/pspp.list - << 'EOF'
 EOF
 if [ $? -ne 0 ] ; then fail ; fi
 
 EOF
 if [ $? -ne 0 ] ; then fail ; fi
 
+
+activity="create program 2"
+cat > $TESTFILE <<EOF
+GET DATA /TYPE=psql 
+       /CONNECT="host=$TEMPDIR port=$port dbname=$dbase"
+       /UNENCRYPTED
+       /SQL="select * from empty".
+
+DISPLAY DICTIONARY.
+
+LIST.
+EOF
+if [ $? -ne 0 ] ; then no_result ; fi
+
+activity="run program 2"
+$SUPERVISOR $PSPP --testing-mode -o raw-ascii $TESTFILE
+if [ $? -ne 0 ] ; then no_result ; fi
+
+activity="compare output 2"
+perl -pi -e 's/^\s*$//g' $TEMPDIR/pspp.list
+diff -b  $TEMPDIR/pspp.list - << 'EOF'
+1.1 DISPLAY.  
++--------+-------------------------------------------+--------+
+|Variable|Description                                |Position|
+#========#===========================================#========#
+|a       |Format: F8.2                               |       1|
+|        |Measure: Scale                             |        |
+|        |Display Alignment: Right                   |        |
+|        |Display Width: 8                           |        |
++--------+-------------------------------------------+--------+
+|b       |Format: DATE11                             |       2|
+|        |Measure: Scale                             |        |
+|        |Display Alignment: Right                   |        |
+|        |Display Width: 8                           |        |
++--------+-------------------------------------------+--------+
+|c       |Format: E40.2                              |       3|
+|        |Measure: Scale                             |        |
+|        |Display Alignment: Right                   |        |
+|        |Display Width: 8                           |        |
++--------+-------------------------------------------+--------+
+EOF
+if [ $? -ne 0 ] ; then fail ; fi
+
+activity="create program 3"
+cat > $TESTFILE <<EOF
+GET DATA /TYPE=psql 
+       /CONNECT="host=$TEMPDIR port=$port dbname=$dbase"
+       /UNENCRYPTED
+       /BSIZE = 27
+       /SQL="select * from large".
+
+NUMERIC diff.
+COMPUTE diff = x - lag (x).
+
+TEMPORARY.
+SELECT IF (diff <> 1).
+LIST.
+
+TEMPORARY.
+N OF CASES 6.
+LIST.
+
+SORT CASES BY x (D).
+
+TEMPORARY.
+N OF CASES 6.
+LIST.
+
+EOF
+if [ $? -ne 0 ] ; then no_result ; fi
+
+activity="run program 3"
+$SUPERVISOR $PSPP --testing-mode -o raw-ascii $TESTFILE
+if [ $? -ne 0 ] ; then no_result ; fi
+
+activity="compare output 3"
+perl -pi -e 's/^\s*$//g' $TEMPDIR/pspp.list
+diff -b  $TEMPDIR/pspp.list - << 'EOF'
+       x     diff
+-------- --------
+    1.00      .   
+    2.00     1.00 
+    3.00     1.00 
+    4.00     1.00 
+    5.00     1.00 
+    6.00     1.00 
+       x     diff
+-------- --------
+ 1000.00     1.00 
+  999.00     1.00 
+  998.00     1.00 
+  997.00     1.00 
+  996.00     1.00 
+  995.00     1.00 
+EOF
+if [ $? -ne 0 ] ; then fail ; fi
+
 pass;
 pass;