casereader: Implement save, load.
[pspp] / src / data / casereader.c
index 5ae5a0a8ea4431019660c60e1d56d9d68ed8c3fa..be5509935957adb3c265d90ac496084b2f90b17f 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
+   Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
 
 #include <config.h>
 
-#include <data/casereader.h>
-#include <data/casereader-provider.h>
+#include "data/casereader.h"
+#include "data/casereader-provider.h"
 
 #include <stdlib.h>
 
-#include <data/casewindow.h>
-#include <data/casewriter.h>
-#include <data/settings.h>
-#include <libpspp/assertion.h>
-#include <libpspp/heap.h>
-#include <libpspp/taint.h>
+#include "data/casereader-shim.h"
+#include "data/casewriter.h"
+#include "libpspp/assertion.h"
+#include "libpspp/heap.h"
+#include "libpspp/taint.h"
 
-#include "xalloc.h"
+#include "gl/xalloc.h"
 
 /* A casereader. */
 struct casereader
@@ -41,8 +40,6 @@ struct casereader
     void *aux;                            /* Auxiliary data for class. */
   };
 
-static void insert_shim (struct casereader *);
-
 /* Reads and returns the next case from READER.  The caller owns
    the returned case and must call case_unref on it when its data
    is no longer needed.  Returns a null pointer if cases have
@@ -114,32 +111,13 @@ casereader_clone (const struct casereader *reader_)
     return NULL;
 
   if (reader->class->clone == NULL)
-    insert_shim (reader);
+    casereader_shim_insert (reader);
   clone = reader->class->clone (reader, reader->aux);
   assert (clone != NULL);
   assert (clone != reader);
   return clone;
 }
 
-/* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
-   *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
-void
-casereader_split (struct casereader *original,
-                  struct casereader **new1, struct casereader **new2)
-{
-  if (new1 != NULL && new2 != NULL)
-    {
-      *new1 = casereader_rename (original);
-      *new2 = casereader_clone (*new1);
-    }
-  else if (new1 != NULL)
-    *new1 = casereader_rename (original);
-  else if (new2 != NULL)
-    *new2 = casereader_rename (original);
-  else
-    casereader_destroy (original);
-}
-
 /* Returns a copy of READER, which is itself destroyed.
    Useful for taking over ownership of a casereader, to enforce
    preventing the original owner from accessing the casereader
@@ -175,7 +153,7 @@ casereader_peek (struct casereader *reader, casenumber idx)
     {
       struct ccase *c;
       if (reader->class->peek == NULL)
-        insert_shim (reader);
+        casereader_shim_insert (reader);
       c = reader->class->peek (reader, reader->aux, idx);
       if (c != NULL)
         return c;
@@ -257,6 +235,20 @@ casereader_get_case_cnt (struct casereader *reader)
   return reader->case_cnt;
 }
 
+static casenumber
+casereader_count_cases__ (const struct casereader *reader,
+                          casenumber max_cases)
+{
+  struct casereader *clone;
+  casenumber n_cases;
+
+  clone = casereader_clone (reader);
+  n_cases = casereader_advance (clone, max_cases);
+  casereader_destroy (clone);
+
+  return n_cases;
+}
+
 /* Returns the number of cases that will be read by successive
    calls to casereader_read for READER, assuming that no errors
    occur.  Upon an error condition, the case count drops to 0, so
@@ -267,25 +259,30 @@ casereader_get_case_cnt (struct casereader *reader)
    of the contents of a clone of READER.  Thus, the return value
    is always correct in the absence of I/O errors. */
 casenumber
-casereader_count_cases (struct casereader *reader)
+casereader_count_cases (const struct casereader *reader)
 {
   if (reader->case_cnt == CASENUMBER_MAX)
     {
-      casenumber n_cases = 0;
-      struct ccase *c;
-
-      struct casereader *clone = casereader_clone (reader);
-
-      for (; (c = casereader_read (clone)) != NULL; case_unref (c))
-        n_cases++;
-
-      casereader_destroy (clone);
-      reader->case_cnt = n_cases;
+      struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
+      reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
     }
-
   return reader->case_cnt;
 }
 
+/* Truncates READER to at most N cases. */
+void
+casereader_truncate (struct casereader *reader, casenumber n)
+{
+  /* This could be optimized, if it ever becomes too expensive, by adding a
+     "max_cases" member to struct casereader.  We could also add a "truncate"
+     function to the casereader implementation, to allow the casereader to
+     throw away data that cannot ever be read. */
+  if (reader->case_cnt == CASENUMBER_MAX)
+    reader->case_cnt = casereader_count_cases__ (reader, n);
+  if (reader->case_cnt > n)
+    reader->case_cnt = n;
+}
+
 /* Returns the prototype for the cases in READER.  The caller
    must not unref the returned prototype. */
 const struct caseproto *
@@ -294,8 +291,28 @@ casereader_get_proto (const struct casereader *reader)
   return reader->proto;
 }
 
+/* Skips past N cases in READER, stopping when the last case in
+   READER has been read or on an input error.  Returns the number
+   of cases successfully skipped. */
+casenumber
+casereader_advance (struct casereader *reader, casenumber n)
+{
+  casenumber i;
+
+  for (i = 0; i < n; i++)
+    {
+      struct ccase *c = casereader_read (reader);
+      if (c == NULL)
+        break;
+      case_unref (c);
+    }
+
+  return i;
+}
+
+
 /* Copies all the cases in READER to WRITER, propagating errors
-   appropriately. */
+   appropriately. READER is destroyed by this function */
 void
 casereader_transfer (struct casereader *reader, struct casewriter *writer)
 {
@@ -308,6 +325,56 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer)
   casereader_destroy (reader);
 }
 
+struct pxd_object *
+casereader_save (const struct casereader *reader_, struct pxd *pxd)
+{
+  struct pxd_array_builder ab;
+  struct casereader *reader;
+  struct pxd_builder b;
+  struct ccase *c;
+
+  pxd_array_builder_init (&ab, pxd);
+  reader = casereader_clone (reader_);
+  while ((c = casereader_read (reader)) != NULL)
+    {
+      pxd_array_builder_add (&ab, case_save (c, pxd));
+      case_unref (c);
+    }
+  casereader_destroy (reader);
+
+  pxd_builder_init (&b, pxd);
+  pxd_builder_put_link (&b, caseproto_save (reader->proto, pxd));
+  pxd_builder_put_link (&b, pxd_array_builder_commit (&ab));
+
+  return pxd_builder_commit (&b);
+}
+
+struct casereader *
+casereader_load (struct pxd_object *object, const struct pxd *pxd)
+{
+  struct casewriter *writer;
+  struct caseproto *proto;
+  struct pxd_array array;
+  struct pxd_parser p;
+  size_t i;
+
+  pxd_parser_init (&p, object, pxd);
+
+  proto = caseproto_load (pxd_parser_get_link (&p), pxd);
+  writer = autopaging_writer_create (proto);
+  caseproto_unref (proto);
+
+  pxd_array_init (&array, pxd_parser_get_link (&p), pxd);
+  for (i = 0; i < pxd_array_size (&array); i++)
+    casewriter_write (writer, case_load (pxd_array_get (&array, i), pxd));
+  pxd_array_destroy (&array);
+
+  pxd_parser_destroy (&p);
+
+  return casewriter_make_reader (writer);
+}
+
+
 /* Creates and returns a new casereader.  This function is
    intended for use by casereader implementations, not by
    casereader clients.
@@ -559,108 +626,42 @@ static const struct casereader_class random_reader_casereader_class =
     random_reader_peek,
   };
 \f
-/* Buffering shim for implementing clone and peek operations.
-
-   The "clone" and "peek" operations aren't implemented by all
-   types of casereaders, but we have to expose a uniform
-   interface anyhow.  We do this by interposing a buffering
-   casereader on top of the existing casereader on the first call
-   to "clone" or "peek".  The buffering casereader maintains a
-   window of cases that spans the positions of the original
-   casereader and all of its clones (the "clone set"), from the
-   position of the casereader that has read the fewest cases to
-   the position of the casereader that has read the most.
-
-   Thus, if all of the casereaders in the clone set are at
-   approximately the same position, only a few cases are buffered
-   and there is little inefficiency.  If, on the other hand, one
-   casereader is not used to read any cases at all, but another
-   one is used to read all of the cases, the entire contents of
-   the casereader is copied into the buffer.  This still might
-   not be so inefficient, given that case data in memory is
-   shared across multiple identical copies, but in the worst case
-   the window implementation will write cases to disk instead of
-   maintaining them in-memory. */
-
-/* A buffering shim for a non-clonable or non-peekable
-   casereader. */
-struct shim
-  {
-    struct casewindow *window;          /* Window of buffered cases. */
-    struct casereader *subreader;       /* Subordinate casereader. */
-  };
+\f
+static const struct casereader_class casereader_null_class;
 
-static const struct casereader_random_class shim_class;
+/* Returns a casereader with no cases.  The casereader has the prototype
+   specified by PROTO.  PROTO may be specified as a null pointer, in which case
+   the casereader has no variables. */
+struct casereader *
+casereader_create_empty (const struct caseproto *proto_)
+{
+  struct casereader *reader;
+  struct caseproto *proto;
 
-/* Interposes a buffering shim atop READER. */
-static void
-insert_shim (struct casereader *reader)
-{
-  const struct caseproto *proto = casereader_get_proto (reader);
-  casenumber case_cnt = casereader_get_case_cnt (reader);
-  struct shim *b = xmalloc (sizeof *b);
-  b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
-  b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
-  casereader_swap (reader, b->subreader);
-  taint_propagate (casewindow_get_taint (b->window),
-                   casereader_get_taint (reader));
-  taint_propagate (casereader_get_taint (b->subreader),
-                   casereader_get_taint (reader));
-}
-
-/* Ensures that B's window contains at least CASE_CNT cases.
-   Return true if successful, false upon reaching the end of B's
-   subreader or an I/O error. */
-static bool
-prime_buffer (struct shim *b, casenumber case_cnt)
-{
-  while (casewindow_get_case_cnt (b->window) < case_cnt)
-    {
-      struct ccase *tmp = casereader_read (b->subreader);
-      if (tmp == NULL)
-        return false;
-      casewindow_push_head (b->window, tmp);
-    }
-  return true;
-}
+  proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
+  reader = casereader_create_sequential (NULL, proto, 0,
+                                         &casereader_null_class, NULL);
+  caseproto_unref (proto);
 
-/* Reads the case at the given 0-based OFFSET from the front of
-   the window into C.  Returns the case if successful, or a null
-   pointer if OFFSET is beyond the end of file or upon I/O error.
-   The caller must call case_unref() on the returned case when it
-   is no longer needed. */
-static struct ccase *
-shim_read (struct casereader *reader UNUSED, void *b_,
-           casenumber offset)
-{
-  struct shim *b = b_;
-  if (!prime_buffer (b, offset + 1))
-    return NULL;
-  return casewindow_get_case (b->window, offset);
+  return reader;
 }
 
-/* Destroys B. */
-static void
-shim_destroy (struct casereader *reader UNUSED, void *b_)
+static struct ccase *
+casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
 {
-  struct shim *b = b_;
-  casewindow_destroy (b->window);
-  casereader_destroy (b->subreader);
-  free (b);
+  return NULL;
 }
 
-/* Discards CNT cases from the front of B's window. */
 static void
-shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
+casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
 {
-  struct shim *b = b_;
-  casewindow_pop_tail (b->window, case_cnt);
+  /* Nothing to do. */
 }
 
-/* Class for the buffered reader. */
-static const struct casereader_random_class shim_class =
+static const struct casereader_class casereader_null_class =
   {
-    shim_read,
-    shim_destroy,
-    shim_advance,
+    casereader_null_read,
+    casereader_null_destroy,
+    NULL,                       /* clone */
+    NULL,                       /* peek */
   };