Add casereaders and casewriters, the basis of the new data processing
authorBen Pfaff <blp@gnu.org>
Thu, 7 Jun 2007 05:36:24 +0000 (05:36 +0000)
committerBen Pfaff <blp@gnu.org>
Thu, 7 Jun 2007 05:36:24 +0000 (05:36 +0000)
implementation.  A casereader is a uniform interface to reading cases
from a data source; a casewriter is a uniform interface to writing
cases to a data sink.

src/data/ChangeLog
src/data/automake.mk
src/data/casereader-filter.c [new file with mode: 0644]
src/data/casereader-provider.h [new file with mode: 0644]
src/data/casereader-translator.c [new file with mode: 0644]
src/data/casereader.c [new file with mode: 0644]
src/data/casereader.h [new file with mode: 0644]
src/data/casewriter-provider.h [new file with mode: 0644]
src/data/casewriter-translator.c [new file with mode: 0644]
src/data/casewriter.c [new file with mode: 0644]
src/data/casewriter.h [new file with mode: 0644]

index 1e723f113427089fff06f07b24090d8e35666b1f..54a53a4d022397d1a53c0496a00e07e927f5f22d 100644 (file)
@@ -1,3 +1,30 @@
+2007-06-06  Ben Pfaff  <blp@gnu.org>
+
+       Add casereaders and casewriters, the basis of the new data processing
+       implementation.  A casereader is a uniform interface to reading cases
+       from a data source; a casewriter is a uniform interface to writing
+       cases to a data sink.
+
+       * automake.mk: Add new files.
+       
+       * casereader-filter.c: New file.
+       
+       * casereader-provider.h: New file.
+
+       * casereader-translator.c: New file.
+       
+       * casereader.c: New file.
+       
+       * casereader.h: New file.
+       
+       * casewriter-provider.h: New file.
+       
+       * casewriter-translator.c: New file.
+       
+       * casewriter.c: New file.
+       
+       * casewriter.h: New file.
+
 2007-06-06  Ben Pfaff  <blp@gnu.org>
 
        "casewindow" data structure that extends the deque (from libpspp)
index 9b084692fbb885182e0fbe01fd186c441e49083e..3f2b503f27f70ca168aad749f46de2a8e733b0f8 100644 (file)
@@ -19,8 +19,17 @@ src_data_libdata_a_SOURCES = \
        src/data/casefile.c \
        src/data/casefile-factory.h \
        src/data/casefile-private.h \
+       src/data/casereader-filter.c \
+       src/data/casereader-provider.h \
+       src/data/casereader-translator.c \
+       src/data/casereader.c \
+       src/data/casereader.h \
        src/data/casewindow.c \
        src/data/casewindow.h \
+       src/data/casewriter-provider.h \
+       src/data/casewriter-translator.c \
+       src/data/casewriter.c \
+       src/data/casewriter.h \
        src/data/fastfile.c \
        src/data/fastfile.h \
        src/data/fastfile-factory.h \
diff --git a/src/data/casereader-filter.c b/src/data/casereader-filter.c
new file mode 100644 (file)
index 0000000..88d798e
--- /dev/null
@@ -0,0 +1,244 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2007 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+#include <config.h>
+
+#include <data/casereader.h>
+
+#include <stdlib.h>
+
+#include <data/casereader-provider.h>
+#include <data/casewriter.h>
+#include <data/variable.h>
+#include <data/dictionary.h>
+#include <libpspp/taint.h>
+#include <libpspp/message.h>
+
+#include "xalloc.h"
+
+#include "gettext.h"
+#define _(msgid) gettext (msgid)
+
+struct casereader_filter 
+  {
+    struct casereader *subreader;
+    bool (*include) (const struct ccase *, void *aux);
+    bool (*destroy) (void *aux);
+    void *aux;
+    struct casewriter *exclude;
+  };
+
+static struct casereader_class casereader_filter_class;
+
+struct casereader *
+casereader_create_filter_func (struct casereader *subreader,
+                               bool (*include) (const struct ccase *,
+                                                void *aux),
+                               bool (*destroy) (void *aux),
+                               void *aux,
+                               struct casewriter *exclude) 
+{
+  struct casereader_filter *filter = xmalloc (sizeof *filter);
+  struct casereader *reader;
+  filter->subreader = casereader_rename (subreader);
+  filter->include = include;
+  filter->destroy = destroy;
+  filter->aux = aux;
+  filter->exclude = exclude;
+  reader = casereader_create_sequential (
+    NULL, casereader_get_value_cnt (filter->subreader), CASENUMBER_MAX,
+    &casereader_filter_class, filter);
+  taint_propagate (casereader_get_taint (filter->subreader),
+                   casereader_get_taint (reader));
+  return reader;
+}
+
+static bool
+casereader_filter_read (struct casereader *reader UNUSED, void *filter_,
+                        struct ccase *c) 
+
+{
+  struct casereader_filter *filter = filter_;
+  for (;;)
+    {
+      if (!casereader_read (filter->subreader, c))
+        return false;
+      else if (filter->include (c, filter->aux)) 
+        return true;
+      else if (filter->exclude != NULL)
+        casewriter_write (filter->exclude, c);
+      else
+        case_destroy (c); 
+    }
+}
+
+static void
+casereader_filter_destroy (struct casereader *reader, void *filter_) 
+{
+  struct casereader_filter *filter = filter_;
+  casereader_destroy (filter->subreader);
+  if (filter->destroy != NULL && !filter->destroy (filter->aux))
+    casereader_force_error (reader);
+  free (filter);
+}
+
+static struct casereader_class casereader_filter_class = 
+  {
+    casereader_filter_read,
+    casereader_filter_destroy,
+
+    /* We could in fact delegate clone to the subreader, if the
+       filter function is required to have no memory and if we
+       added reference counting.  But it might be useful to have
+       filter functions with memory and in any case this would
+       require a little extra work. */
+    NULL,
+    NULL,
+  };
+
+struct casereader_filter_weight 
+  {
+    const struct variable *weight_var;
+    bool *warn_on_invalid;
+    bool local_warn_on_invalid;
+  };
+
+static bool
+casereader_filter_weight_include (const struct ccase *c, void *cfw_) 
+{
+  struct casereader_filter_weight *cfw = cfw_;
+  double value = case_num (c, cfw->weight_var);
+  if (value >= 0.0 && !var_is_num_missing (cfw->weight_var, value, MV_ANY))
+    return true;
+  else
+    {
+      if (*cfw->warn_on_invalid) 
+        {
+         msg (SW, _("At least one case in the data read had a weight value "
+                    "that was user-missing, system-missing, zero, or "
+                    "negative.  These case(s) were ignored."));
+          *cfw->warn_on_invalid = false;
+        }
+      return false;
+    }
+}
+
+static bool
+casereader_filter_weight_destroy (void *cfw_) 
+{
+  struct casereader_filter_weight *cfw = cfw_;
+  free (cfw);
+  return true;
+}
+
+struct casereader *
+casereader_create_filter_weight (struct casereader *reader,
+                                 const struct dictionary *dict,
+                                 bool *warn_on_invalid,
+                                 struct casewriter *exclude) 
+{
+  struct variable *weight_var = dict_get_weight (dict);
+  if (weight_var != NULL) 
+    {
+      struct casereader_filter_weight *cfw = xmalloc (sizeof *cfw);
+      cfw->weight_var = weight_var;
+      cfw->warn_on_invalid = (warn_on_invalid
+                               ? warn_on_invalid
+                               : &cfw->local_warn_on_invalid);
+      cfw->local_warn_on_invalid = true;
+      reader = casereader_create_filter_func (reader,
+                                              casereader_filter_weight_include,
+                                              casereader_filter_weight_destroy,
+                                              cfw, exclude);
+    }
+  else
+    reader = casereader_rename (reader);
+  return reader;
+}
+\f
+struct casereader_filter_missing 
+  {
+    struct variable **vars;
+    size_t var_cnt;
+    enum mv_class class;
+  };
+
+static bool
+casereader_filter_missing_include (const struct ccase *c, void *cfm_) 
+{
+  const struct casereader_filter_missing *cfm = cfm_;
+  size_t i;
+
+  for (i = 0; i < cfm->var_cnt; i++)
+    {
+      struct variable *var = cfm->vars[i];
+      const union value *value = case_data (c, var);
+      if (var_is_value_missing (var, value, cfm->class))
+        return false;
+    }
+  return true;
+}
+
+static bool
+casereader_filter_missing_destroy (void *cfm_) 
+{
+  struct casereader_filter_missing *cfm = cfm_;
+  free (cfm->vars);
+  free (cfm);
+  return true;
+}
+
+struct casereader *
+casereader_create_filter_missing (struct casereader *reader,
+                                  struct variable **vars, size_t var_cnt,
+                                  enum mv_class class,
+                                  struct casewriter *exclude) 
+{
+  if (var_cnt > 0 && class != MV_NEVER) 
+    {
+      struct casereader_filter_missing *cfm = xmalloc (sizeof *cfm);
+      cfm->vars = xmemdup (vars, sizeof *vars * var_cnt);
+      cfm->var_cnt = var_cnt;
+      cfm->class = class;
+      return casereader_create_filter_func (reader,
+                                            casereader_filter_missing_include,
+                                            casereader_filter_missing_destroy,
+                                            cfm,
+                                            exclude);
+    }
+  else
+    return casereader_rename (reader);
+}
+\f
+\f
+static bool
+casereader_counter_include (const struct ccase *c UNUSED, void *counter_) 
+{
+  casenumber *counter = counter_;
+  ++*counter;
+  return true;
+}
+
+struct casereader *
+casereader_create_counter (struct casereader *reader, casenumber *counter,
+                           casenumber initial_value) 
+{
+  *counter = initial_value;
+  return casereader_create_filter_func (reader, casereader_counter_include,
+                                        NULL, counter, NULL);
+}
diff --git a/src/data/casereader-provider.h b/src/data/casereader-provider.h
new file mode 100644 (file)
index 0000000..2726f68
--- /dev/null
@@ -0,0 +1,161 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2007 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+/* Definitions needed to implement a new type of casereader.
+   Code that only uses casereaders does not need this header.
+
+   Two functions to create casereaders are supplied:
+
+        - casereader_create_sequential, to create a casereader
+          for a data source that is naturally sequential.  The
+          casereader layer will automatically, as needed,
+          simulate the ability to access cases randomly.
+
+        - casereader_create_random, to create a casereader for a
+          data source that supports random access to cases.  (This
+          function is in fact implemented as a set of wrappers
+          around casereader_create_sequential.)
+
+   Which function is used has no effect on the set of operations
+   that may be performed on the resulting casereader, only on how
+   the casereader is implemented internally. */
+
+#ifndef DATA_CASEREADER_PROVIDER_H
+#define DATA_CASEREADER_PROVIDER_H 1
+
+#include <data/casereader.h>
+
+/* Casereader class for sequential data sources. */
+struct casereader_class 
+  {
+    /* Mandatory.
+
+       Reads the next case from READER into case C, which the
+       casereader must create and which the client is responsible
+       for destroying.  If successful, returns true and advances
+       READER to the next case, so that the next call to this
+       function will read the next case.  The case just read will
+       never be read again by a call to this function for READER.
+
+       At end of file or upon an I/O error, returns false.  After
+       false is returned once, this function will not be called
+       again for the given READER.
+
+       If an I/O error occurs, this function should call
+       casereader_force_error on READER. */
+    bool (*read) (struct casereader *reader, void *aux, struct ccase *c);
+
+    /* Mandatory.
+
+       Destroys READER.
+
+       If an I/O error is detected during destruction, this
+       function should call casereader_force_error on READER. */
+    void (*destroy) (struct casereader *reader, void *aux);
+
+    /* Optional: if convenient and efficiently implementable,
+       supply this function as an optimization for use by
+       casereader_clone.  (But it might be easier to use the
+       random-access casereader wrapper instead.)
+
+       Creates and returns a clone of READER.  The clone must
+       read the same case data in the same sequence as READER,
+       starting from the same position.  The only allowable
+       exception to this rule is that I/O errors may force the
+       clone or the original casereader to stop reading after
+       differing numbers of cases.
+
+       The clone should have a clone of READER's taint object,
+       accomplished by passing casereader_get_taint (READER) to
+       casereader_create. */
+    struct casereader *(*clone) (struct casereader *reader, void *aux);
+
+    /* Optional: if convenient and efficiently implementable,
+       supply as an optimization for use by casereader_peek.
+       (But it might be easier to use the random-access
+       casereader wrapper instead.)
+
+       Reads the case at 0-based offset IDX from the beginning of
+       READER into case C, which the casereader must create and
+       which the client is responsible for destroying.
+
+       At end of file or upon an I/O error, returns false.  If
+       this function returns false, then it will never be called
+       again for an equal or greater value of IDX, and the "read"
+       member function will never be called to advance as far as
+       IDX cases further into the casereader.  That is, returning
+       false indicates that the casereader has fewer than IDX
+       cases left.
+
+       If an I/O error occurs, this function should call
+       casereader_force_error on READER. */
+    bool (*peek) (struct casereader *reader, void *aux, casenumber idx,
+                  struct ccase *c);
+  };
+
+struct casereader *
+casereader_create_sequential (const struct taint *,
+                              size_t value_cnt, casenumber case_cnt,
+                              const struct casereader_class *, void *);
+\f
+/* Casereader class for random-access data sources. */
+struct casereader_random_class
+  {
+    /* Mandatory.
+
+       Reads the case at 0-based offset IDX from the beginning of
+       READER into case C, which the casereader must create and
+       which the client is responsible for destroying.
+
+       At end of file or upon an I/O error, returns false.  If
+       this function returns false, then it will never be called
+       again for an equal or greater value of IDX, and the "read"
+       member function will never be called to advance as far as
+       IDX cases further into the casereader.  That is, returning
+       false indicates that the casereader has fewer than IDX
+       cases.
+
+       If an I/O error occurs, this function should call
+       casereader_force_error on READER. */
+    bool (*read) (struct casereader *reader, void *aux, casenumber idx,
+                  struct ccase *c);
+
+    /* Mandatory.
+
+       Destroys READER.
+
+       If an I/O error is detected during destruction, this
+       function should call casereader_force_error on READER. */
+    void (*destroy) (struct casereader *reader, void *aux);
+
+    /* Mandatory.
+       
+       A call to this function tells the callee that the CNT
+       cases at the beginning of READER will never be read again.
+       The casereader implementation should free any resources
+       associated with those cases.  After this function returns,
+       the IDX argument in future calls to the "read" function
+       will be relative to remaining cases. */
+    void (*advance) (struct casereader *reader, void *aux, casenumber cnt);
+  };
+
+struct casereader *
+casereader_create_random (size_t value_cnt, casenumber case_cnt,
+                          const struct casereader_random_class *, void *aux);
+
+#endif /* data/casereader-provider.h */
diff --git a/src/data/casereader-translator.c b/src/data/casereader-translator.c
new file mode 100644 (file)
index 0000000..f41dbb5
--- /dev/null
@@ -0,0 +1,96 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2007 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+#include <config.h>
+
+#include <data/casereader.h>
+
+#include <stdlib.h>
+
+#include <data/casereader-provider.h>
+#include <libpspp/taint.h>
+
+#include "xalloc.h"
+
+struct casereader_translator
+  {
+    struct casereader *subreader;
+
+    void (*translate) (const struct ccase *input, struct ccase *output,
+                       void *aux);
+    bool (*destroy) (void *aux);
+    void *aux;
+  };
+
+static struct casereader_class casereader_translator_class;
+
+struct casereader *
+casereader_create_translator (struct casereader *subreader,
+                              size_t output_value_cnt,
+                              void (*translate) (const struct ccase *input,
+                                                 struct ccase *output,
+                                                 void *aux),
+                              bool (*destroy) (void *aux),
+                              void *aux) 
+{
+  struct casereader_translator *ct = xmalloc (sizeof *ct);
+  struct casereader *reader;
+  ct->subreader = casereader_rename (subreader);
+  ct->translate = translate;
+  ct->destroy = destroy;
+  ct->aux = aux;
+  reader = casereader_create_sequential (
+    NULL, output_value_cnt, casereader_get_case_cnt (ct->subreader),
+    &casereader_translator_class, ct);
+  taint_propagate (casereader_get_taint (ct->subreader),
+                   casereader_get_taint (reader));
+  return reader;
+}
+
+static bool
+casereader_translator_read (struct casereader *reader UNUSED,
+                            void *ct_, struct ccase *c) 
+{
+  struct casereader_translator *ct = ct_;
+  struct ccase tmp;
+
+  if (casereader_read (ct->subreader, &tmp)) 
+    {
+      ct->translate (&tmp, c, ct->aux);
+      return true; 
+    }
+  else
+    return false;
+}
+
+static void
+casereader_translator_destroy (struct casereader *reader UNUSED, void *ct_) 
+{
+  struct casereader_translator *ct = ct_;
+  casereader_destroy (ct->subreader);
+  ct->destroy (ct->aux);
+  free (ct);
+}
+
+static struct casereader_class casereader_translator_class = 
+  {
+    casereader_translator_read,
+    casereader_translator_destroy,
+    NULL,
+    NULL,
+  };
diff --git a/src/data/casereader.c b/src/data/casereader.c
new file mode 100644 (file)
index 0000000..da49e03
--- /dev/null
@@ -0,0 +1,605 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2007 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+#include <config.h>
+
+#include <data/casereader.h>
+#include <data/casereader-provider.h>
+
+#include <stdlib.h>
+
+#include <data/casewindow.h>
+#include <data/casewriter.h>
+#include <data/settings.h>
+#include <libpspp/assertion.h>
+#include <libpspp/heap.h>
+#include <libpspp/taint.h>
+
+#include "xalloc.h"
+
+/* A casereader. */
+struct casereader 
+  {
+    struct taint *taint;                  /* Corrupted? */
+    size_t value_cnt;                     /* Values per case. */
+    casenumber case_cnt;                  /* Number of cases,
+                                             CASENUMBER_MAX if unknown. */
+    const struct casereader_class *class; /* Class. */
+    void *aux;                            /* Auxiliary data for class. */
+  };
+
+static void insert_shim (struct casereader *);
+
+/* Creates a new case in C and reads the next case from READER
+   into it.  The caller owns C and must destroy C when its data
+   is no longer needed.  Return true if successful, false when
+   cases have been exhausted or upon detection of an I/O error.
+   In the latter case, C is set to the null case.
+
+   The case returned is effectively consumed: it can never be
+   read again through READER.  If this is inconvenient, READER
+   may be cloned in advance with casereader_clone, or
+   casereader_peek may be used instead. */
+bool
+casereader_read (struct casereader *reader, struct ccase *c) 
+{
+  if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c)) 
+    {
+      assert (case_get_value_cnt (c) == reader->value_cnt);
+      if (reader->case_cnt != CASENUMBER_MAX)
+        reader->case_cnt--;
+      return true; 
+    }
+  else 
+    {
+      reader->case_cnt = 0;
+      case_nullify (c);
+      return false; 
+    }
+}
+
+/* Destroys READER.
+   Returns false if an I/O error was detected on READER, true
+   otherwise. */
+bool
+casereader_destroy (struct casereader *reader) 
+{
+  bool ok = true;
+  if (reader != NULL) 
+    {
+      reader->class->destroy (reader, reader->aux);
+      ok = taint_destroy (reader->taint);
+      free (reader);
+    }
+  return ok;
+}
+
+/* Returns a clone of READER.  READER and its clone may be used
+   to read the same sequence of cases in the same order, barring
+   I/O errors. */
+struct casereader *
+casereader_clone (const struct casereader *reader_) 
+{
+  struct casereader *reader = (struct casereader *) reader_;
+  struct casereader *clone;
+
+  if (reader->class->clone == NULL)
+    insert_shim (reader);
+  clone = reader->class->clone (reader, reader->aux);
+  assert (clone != NULL);
+  assert (clone != reader);
+  return clone;
+}
+
+/* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
+   *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
+void
+casereader_split (struct casereader *original,
+                  struct casereader **new1, struct casereader **new2) 
+{
+  if (new1 != NULL && new2 != NULL) 
+    {
+      *new1 = casereader_rename (original);
+      *new2 = casereader_clone (*new1);
+    }
+  else if (new1 != NULL)
+    *new1 = casereader_rename (original);
+  else if (new2 != NULL)
+    *new2 = casereader_rename (original);
+  else
+    casereader_destroy (original);
+}
+
+/* Returns a copy of READER, which is itself destroyed.
+   Useful for taking over ownership of a casereader, to enforce
+   preventing the original owner from accessing the casereader
+   again. */
+struct casereader *
+casereader_rename (struct casereader *reader) 
+{
+  struct casereader *new = xmemdup (reader, sizeof *reader);
+  free (reader);
+  return new;
+}
+
+/* Exchanges the casereaders referred to by A and B. */
+void
+casereader_swap (struct casereader *a, struct casereader *b) 
+{
+  if (a != b) 
+    {
+      struct casereader tmp = *a;
+      *a = *b;
+      *b = tmp;
+    }
+}
+
+/* Creates a new case in C and reads the (IDX + 1)'th case from
+   READER into it.  The caller owns C and must destroy C when its
+   data is no longer needed.  Return true if successful, false
+   when cases have been exhausted or upon detection of an I/O
+   error.  In the latter case, C is set to the null case. */
+bool
+casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
+{
+  if (idx < reader->case_cnt)
+    {
+      if (reader->class->peek == NULL)
+        insert_shim (reader);
+      if (reader->class->peek (reader, reader->aux, idx, c))
+        return true;
+      else if (casereader_error (reader)) 
+        reader->case_cnt = 0;
+    }
+  if (reader->case_cnt > idx)
+    reader->case_cnt = idx;
+  case_nullify (c);
+  return false;
+}
+
+/* Returns true if an I/O error or another hard error has
+   occurred on READER, a clone of READER, or on some object on
+   which READER's data has a dependency, false otherwise. */
+bool
+casereader_error (const struct casereader *reader) 
+{
+  return taint_is_tainted (reader->taint);
+}
+
+/* Marks READER as having encountered an error.
+
+   Ordinarily, this function should be called by the
+   implementation of a casereader, not by the casereader's
+   client.  Instead, casereader clients should usually ensure
+   that a casereader's error state is correct by using
+   taint_propagate to propagate to the casereader's taint
+   structure, which may be obtained via casereader_get_taint. */
+void
+casereader_force_error (struct casereader *reader) 
+{
+  taint_set_taint (reader->taint);
+}
+
+/* Returns READER's associate taint object, for use with
+   taint_propagate and other taint functions. */
+const struct taint *
+casereader_get_taint (const struct casereader *reader) 
+{
+  return reader->taint;
+}
+
+/* Returns the number of cases that will be read by successive
+   calls to casereader_read for READER, assuming that no errors
+   occur.  Upon an error condition, the case count drops to 0, so
+   that no more cases can be obtained.
+
+   Not all casereaders can predict the number of cases that they
+   will produce without actually reading all of them.  In that
+   case, this function returns CASENUMBER_MAX.  To obtain the
+   actual number of cases in such a casereader, use
+   casereader_count_cases. */
+casenumber
+casereader_get_case_cnt (struct casereader *reader) 
+{
+  return reader->case_cnt;
+}
+
+/* Returns the number of cases that will be read by successive
+   calls to casereader_read for READER, assuming that no errors
+   occur.  Upon an error condition, the case count drops to 0, so
+   that no more cases can be obtained.
+
+   For a casereader that cannot predict the number of cases it
+   will produce, this function actually reads (and discards) all
+   of the contents of a clone of READER.  Thus, the return value
+   is always correct in the absence of I/O errors. */
+casenumber
+casereader_count_cases (struct casereader *reader)
+{
+  if (reader->case_cnt == CASENUMBER_MAX)
+    {
+      casenumber n_cases = 0;
+      struct ccase c;
+
+      struct casereader *clone = casereader_clone (reader);
+
+      for (; casereader_read (clone, &c); case_destroy (&c))
+        n_cases++;
+
+      casereader_destroy (clone);
+      reader->case_cnt = n_cases;
+    }
+
+  return reader->case_cnt;
+}
+
+/* Returns the number of struct values in each case in READER. */
+size_t
+casereader_get_value_cnt (struct casereader *reader) 
+{
+  return reader->value_cnt;
+}
+
+/* Copies all the cases in READER to WRITER, propagating errors
+   appropriately. */
+void
+casereader_transfer (struct casereader *reader, struct casewriter *writer)
+{
+  struct ccase c;
+
+  taint_propagate (casereader_get_taint (reader),
+                   casewriter_get_taint (writer));
+  while (casereader_read (reader, &c))
+    casewriter_write (writer, &c);
+  casereader_destroy (reader);
+}
+
+/* Creates and returns a new casereader.  This function is
+   intended for use by casereader implementations, not by
+   casereader clients.
+
+   This function is most suited for creating a casereader for a
+   data source that is naturally sequential.
+   casereader_create_random may be more appropriate for a data
+   source that supports random access.
+
+   Ordinarily, specify a null pointer for TAINT, in which case
+   the new casereader will have a new, unique taint object.  If
+   the new casereader should have a clone of an existing taint
+   object, specify that object as TAINT.  (This is most commonly
+   useful in an implementation of the "clone" casereader_class
+   function, in which case the cloned casereader should have the
+   same taint object as the original casereader.)
+
+   VALUE_CNT must be the number of struct values per case read
+   from the casereader.
+
+   CASE_CNT is an upper limit on the number of cases that
+   casereader_read will return from the casereader in successive
+   calls.  Ordinarily, this is the actual number of cases in the
+   data source or CASENUMBER_MAX if the number of cases cannot be
+   predicted in advance.
+
+   CLASS and AUX are a set of casereader implementation-specific
+   member functions and auxiliary data to pass to those member
+   functions, respectively. */
+struct casereader *
+casereader_create_sequential (const struct taint *taint,
+                              size_t value_cnt, casenumber case_cnt,
+                              const struct casereader_class *class, void *aux) 
+{
+  struct casereader *reader = xmalloc (sizeof *reader);
+  reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
+  reader->value_cnt = value_cnt;
+  reader->case_cnt = case_cnt;
+  reader->class = class;
+  reader->aux = aux;
+  return reader;
+}
+\f
+/* Random-access casereader implementation.
+
+   This is a set of wrappers around casereader_create_sequential
+   and struct casereader_class to make it easy to create
+   efficient casereaders for data sources that natively support
+   random access. */
+
+/* One clone of a random reader. */
+struct random_reader
+  {
+    struct random_reader_shared *shared; /* Data shared among clones. */
+    struct heap_node heap_node; /* Node in shared data's heap of readers. */
+    casenumber offset;          /* Number of cases already read. */
+  };
+
+/* Returns the random_reader in which the given heap_node is
+   embedded. */
+static struct random_reader *
+random_reader_from_heap_node (const struct heap_node *node) 
+{
+  return heap_data (node, struct random_reader, heap_node);
+}
+
+/* Data shared among clones of a random reader. */
+struct random_reader_shared 
+  {
+    struct heap *readers;       /* Heap of struct random_readers. */
+    casenumber min_offset;      /* Smallest offset of any random_reader. */
+    const struct casereader_random_class *class;
+    void *aux;
+  };
+
+static struct casereader_class random_reader_casereader_class;
+
+/* Creates and returns a new random_reader with the given SHARED
+   data and OFFSET.  Inserts the new random reader into the
+   shared heap. */
+static struct random_reader *
+make_random_reader (struct random_reader_shared *shared, casenumber offset)
+{
+  struct random_reader *br = xmalloc (sizeof *br);
+  br->offset = offset;
+  br->shared = shared;
+  heap_insert (shared->readers, &br->heap_node);
+  return br;
+}
+
+/* Compares random_readers A and B by offset and returns a
+   strcmp()-like result. */
+static int
+compare_random_readers_by_offset (const struct heap_node *a_,
+                                  const struct heap_node *b_,
+                                  const void *aux UNUSED) 
+{
+  const struct random_reader *a = random_reader_from_heap_node (a_);
+  const struct random_reader *b = random_reader_from_heap_node (b_);
+  return a->offset < b->offset ? -1 : a->offset > b->offset;
+}
+
+/* Creates and returns a new casereader.  This function is
+   intended for use by casereader implementations, not by
+   casereader clients.
+
+   This function is most suited for creating a casereader for a
+   data source that supports random access.
+   casereader_create_sequential is more appropriate for a data
+   source that is naturally sequential.
+
+   VALUE_CNT must be the number of struct values per case read
+   from the casereader.
+
+   CASE_CNT is an upper limit on the number of cases that
+   casereader_read will return from the casereader in successive
+   calls.  Ordinarily, this is the actual number of cases in the
+   data source or CASENUMBER_MAX if the number of cases cannot be
+   predicted in advance.
+
+   CLASS and AUX are a set of casereader implementation-specific
+   member functions and auxiliary data to pass to those member
+   functions, respectively. */
+struct casereader *
+casereader_create_random (size_t value_cnt, casenumber case_cnt,
+                          const struct casereader_random_class *class,
+                          void *aux) 
+{
+  struct random_reader_shared *shared = xmalloc (sizeof *shared);
+  shared->readers = heap_create (compare_random_readers_by_offset, NULL);
+  shared->class = class;
+  shared->aux = aux;
+  shared->min_offset = 0;
+  return casereader_create_sequential (NULL, value_cnt, case_cnt,
+                                       &random_reader_casereader_class,
+                                       make_random_reader (shared, 0));
+}
+
+/* Reassesses the min_offset in SHARED based on the minimum
+   offset in the heap.   */
+static void
+advance_random_reader (struct casereader *reader,
+                       struct random_reader_shared *shared) 
+{
+  casenumber old, new;
+
+  old = shared->min_offset;
+  new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
+  assert (new >= old);
+  if (new > old)
+    {
+      shared->min_offset = new;
+      shared->class->advance (reader, shared->aux, new - old);
+    }
+}
+
+/* struct casereader_class "read" function for random reader. */
+static bool
+random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
+{
+  struct random_reader *br = br_;
+  struct random_reader_shared *shared = br->shared;
+
+  if (shared->class->read (reader, shared->aux,
+                           br->offset - shared->min_offset, c)) 
+    {
+      br->offset++;
+      heap_changed (shared->readers, &br->heap_node);
+      advance_random_reader (reader, shared); 
+      return true;
+    }
+  else
+    return false; 
+}
+
+/* struct casereader_class "destroy" function for random
+   reader. */
+static void
+random_reader_destroy (struct casereader *reader, void *br_) 
+{
+  struct random_reader *br = br_;
+  struct random_reader_shared *shared = br->shared;
+
+  heap_delete (shared->readers, &br->heap_node);
+  if (heap_is_empty (shared->readers)) 
+    {
+      heap_destroy (shared->readers);
+      shared->class->destroy (reader, shared->aux);
+      free (shared);
+    }
+  else
+    advance_random_reader (reader, shared);
+
+  free (br);
+}
+
+/* struct casereader_class "clone" function for random reader. */
+static struct casereader *
+random_reader_clone (struct casereader *reader, void *br_) 
+{
+  struct random_reader *br = br_;
+  struct random_reader_shared *shared = br->shared;
+  return casereader_create_sequential (casereader_get_taint (reader),
+                                       casereader_get_value_cnt (reader),
+                                       casereader_get_case_cnt (reader),
+                                       &random_reader_casereader_class,
+                                       make_random_reader (shared,
+                                                           br->offset));
+}
+
+/* struct casereader_class "peek" function for random reader. */
+static bool
+random_reader_peek (struct casereader *reader, void *br_,
+                    casenumber idx, struct ccase *c) 
+{
+  struct random_reader *br = br_;
+  struct random_reader_shared *shared = br->shared;
+
+  return shared->class->read (reader, shared->aux,
+                              br->offset - shared->min_offset + idx, c);
+}
+
+/* Casereader class for random reader. */
+static struct casereader_class random_reader_casereader_class = 
+  {
+    random_reader_read,
+    random_reader_destroy,
+    random_reader_clone,
+    random_reader_peek,
+  };
+\f
+/* Buffering shim for implementing clone and peek operations.
+
+   The "clone" and "peek" operations aren't implemented by all
+   types of casereaders, but we have to expose a uniform
+   interface anyhow.  We do this by interposing a buffering
+   casereader on top of the existing casereader on the first call
+   to "clone" or "peek".  The buffering casereader maintains a
+   window of cases that spans the positions of the original
+   casereader and all of its clones (the "clone set"), from the
+   position of the casereader that has read the fewest cases to
+   the position of the casereader that has read the most.  
+
+   Thus, if all of the casereaders in the clone set are at
+   approximately the same position, only a few cases are buffered
+   and there is little inefficiency.  If, on the other hand, one
+   casereader is not used to read any cases at all, but another
+   one is used to read all of the cases, the entire contents of
+   the casereader is copied into the buffer.  This still might
+   not be so inefficient, given that case data in memory is
+   shared across multiple identical copies, but in the worst case
+   the window implementation will write cases to disk instead of
+   maintaining them in-memory. */
+
+/* A buffering shim for a non-clonable or non-peekable
+   casereader. */
+struct shim 
+  {
+    struct casewindow *window;          /* Window of buffered cases. */
+    struct casereader *subreader;       /* Subordinate casereader. */
+  };
+
+static struct casereader_random_class shim_class;
+
+/* Interposes a buffering shim atop READER. */
+static void
+insert_shim (struct casereader *reader) 
+{
+  size_t value_cnt = casereader_get_value_cnt (reader);
+  casenumber case_cnt = casereader_get_case_cnt (reader);
+  struct shim *b = xmalloc (sizeof *b);
+  b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
+  b->subreader = casereader_create_random (value_cnt, case_cnt,
+                                           &shim_class, b);
+  casereader_swap (reader, b->subreader);
+  taint_propagate (casewindow_get_taint (b->window),
+                   casereader_get_taint (reader));
+  taint_propagate (casereader_get_taint (b->subreader),
+                   casereader_get_taint (reader));
+}
+
+/* Ensures that B's window contains at least CASE_CNT cases.
+   Return true if successful, false upon reaching the end of B's
+   subreader or an I/O error. */
+static bool
+prime_buffer (struct shim *b, casenumber case_cnt) 
+{
+  while (casewindow_get_case_cnt (b->window) < case_cnt) 
+    {
+      struct ccase tmp;
+      if (!casereader_read (b->subreader, &tmp))
+        return false;
+      casewindow_push_head (b->window, &tmp);
+    }
+  return true;
+}
+
+/* Reads the case at the given 0-based OFFSET from the front of
+   the window into C.  Returns true if successful, false if
+   OFFSET is beyond the end of file or upon I/O error. */
+static bool
+shim_read (struct casereader *reader UNUSED, void *b_,
+           casenumber offset, struct ccase *c) 
+{
+  struct shim *b = b_;
+  return (prime_buffer (b, offset + 1)
+          && casewindow_get_case (b->window, offset, c));
+}
+
+/* Destroys B. */
+static void
+shim_destroy (struct casereader *reader UNUSED, void *b_) 
+{
+  struct shim *b = b_;
+  casewindow_destroy (b->window);
+  casereader_destroy (b->subreader);
+  free (b);
+}
+
+/* Discards CNT cases from the front of B's window. */
+static void
+shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
+{
+  struct shim *b = b_;
+  casewindow_pop_tail (b->window, case_cnt);
+}
+
+/* Class for the buffered reader. */
+static struct casereader_random_class shim_class = 
+  {
+    shim_read,
+    shim_destroy,
+    shim_advance,
+  };
diff --git a/src/data/casereader.h b/src/data/casereader.h
new file mode 100644 (file)
index 0000000..bd066a9
--- /dev/null
@@ -0,0 +1,116 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2007 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+/* Casereader client interface.
+
+   A casereader abstracts interfaces through which cases may be
+   read.  A casereader may be a front-end for a system file, a
+   portable file, the active file in a data set, or anything else
+   on which a casereader interface has been overlaid.  Casereader
+   layering, in which a casereader acts as a filter or translator
+   on top of another casereader, is also supported.
+
+   There is no central interface for obtaining casereaders: a
+   casereader for reading a system file is obtained from the
+   system file reading module, and so on.  Once a casereader has
+   been obtained, by whatever means, the interface to it is
+   uniform.  The most important functions for casereader usage
+   are:
+
+     - casereader_read: Reads a case from the casereader.  The
+       case is consumed and cannot be read again.  The caller is
+       responsible for destroying the case.
+
+     - casereader_clone: Makes a copy of a casereader.  May be
+       used to read one or a set of cases from a casereader
+       repeatedly.
+
+     - casereader_destroy: Destroys a casereader.
+
+   Casereaders can encounter error conditions, such as I/O
+   errors, as they read cases.  Error conditions prevent any more
+   cases from being read from the casereader.  Error conditions
+   are reported by casereader_error.  Error condition may be
+   propagated to or from a casereader with taint_propagate using
+   the casereader's taint object, which may be obtained with
+   casereader_get_taint. */
+
+#ifndef DATA_CASEREADER_H
+#define DATA_CASEREADER_H 1
+
+#include <libpspp/compiler.h>
+#include <data/case.h>
+#include <data/missing-values.h>
+
+struct dictionary;
+struct casereader;
+struct casewriter;
+
+bool casereader_read (struct casereader *, struct ccase *);
+bool casereader_destroy (struct casereader *);
+
+struct casereader *casereader_clone (const struct casereader *);
+void casereader_split (struct casereader *,
+                       struct casereader **, struct casereader **);
+struct casereader *casereader_rename (struct casereader *);
+void casereader_swap (struct casereader *, struct casereader *);
+
+bool casereader_peek (struct casereader *, casenumber, struct ccase *)
+     WARN_UNUSED_RESULT;
+
+bool casereader_error (const struct casereader *);
+void casereader_force_error (struct casereader *);
+const struct taint *casereader_get_taint (const struct casereader *);
+
+casenumber casereader_get_case_cnt (struct casereader *);
+casenumber casereader_count_cases (struct casereader *);
+size_t casereader_get_value_cnt (struct casereader *);
+
+void casereader_transfer (struct casereader *, struct casewriter *);
+\f
+struct casereader *
+casereader_create_filter_func (struct casereader *,
+                               bool (*include) (const struct ccase *,
+                                                void *aux),
+                               bool (*destroy) (void *aux),
+                               void *aux,
+                               struct casewriter *exclude);
+struct casereader *
+casereader_create_filter_weight (struct casereader *,
+                                 const struct dictionary *dict,
+                                 bool *warn_on_invalid,
+                                 struct casewriter *exclude);
+struct casereader *
+casereader_create_filter_missing (struct casereader *,
+                                  struct variable **vars, size_t var_cnt,
+                                  enum mv_class,
+                                  struct casewriter *exclude);
+
+struct casereader *
+casereader_create_counter (struct casereader *, casenumber *counter,
+                           casenumber initial_value);
+
+struct casereader *
+casereader_create_translator (struct casereader *, size_t output_value_cnt,
+                              void (*translate) (const struct ccase *input,
+                                                 struct ccase *output,
+                                                 void *aux),
+                              bool (*destroy) (void *aux),
+                              void *aux);
+
+#endif /* data/casereader.h */
diff --git a/src/data/casewriter-provider.h b/src/data/casewriter-provider.h
new file mode 100644 (file)
index 0000000..1f4fcab
--- /dev/null
@@ -0,0 +1,63 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2006 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+#ifndef DATA_CASEWRITER_PROVIDER_H
+#define DATA_CASEWRITER_PROVIDER_H 1
+
+#include <data/casewriter.h>
+
+struct casewriter_class 
+  {
+    /* Mandatory.
+
+       Writes case C to WRITER.  Destroys C before returning.
+
+       If an I/O error occurs, this function should call
+       casewriter_force_error on WRITER.  Some I/O error
+       detection may be deferred to the "destroy" member function
+       (e.g. writes to disk need not be flushed by "write") . */
+    void (*write) (struct casewriter *writer, void *aux, struct ccase *c);
+
+    /* Mandatory.
+
+       Finalizes output and destroys WRITER.
+
+       If an I/O error is detected while finalizing output
+       (e.g. while flushing output to disk), this function should
+       call casewriter_force_error on WRITER. */
+    void (*destroy) (struct casewriter *writer, void *aux);
+
+    /* Optional: supply if practical and desired by clients.
+
+       Finalizes output to WRITER, destroys WRITER, and in its
+       place returns a casereader that can be used to read back
+       the data written to WRITER.  WRITER will not be used again
+       after calling this function, even as an argument to
+       casewriter_destroy.
+
+       If an I/O error is detected while finalizing output
+       (e.g. while flushing output to disk), this function should
+       call casewriter_force_error on WRITER.  The caller will
+       ensure that the error is propagated to the returned
+       casereader. */
+    struct casereader *(*convert_to_reader) (struct casewriter *, void *aux);
+  };
+
+struct casewriter *casewriter_create (const struct casewriter_class *, void *);
+
+#endif /* data/casewriter-provider.h */
diff --git a/src/data/casewriter-translator.c b/src/data/casewriter-translator.c
new file mode 100644 (file)
index 0000000..0f47f76
--- /dev/null
@@ -0,0 +1,98 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2007 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+#include <config.h>
+
+#include <data/casewriter.h>
+#include <data/casewriter-provider.h>
+
+#include <stdlib.h>
+
+#include <libpspp/taint.h>
+
+#include "xalloc.h"
+
+struct casewriter_translator
+  {
+    struct casewriter *subwriter;
+
+    void (*translate) (const struct ccase *input, struct ccase *output,
+                       void *aux);
+    bool (*destroy) (void *aux);
+    void *aux;
+  };
+
+static struct casewriter_class casewriter_translator_class;
+
+struct casewriter *
+casewriter_create_translator (struct casewriter *subwriter,
+                              void (*translate) (const struct ccase *input,
+                                                 struct ccase *output,
+                                                 void *aux),
+                              bool (*destroy) (void *aux),
+                              void *aux) 
+{
+  struct casewriter_translator *ct = xmalloc (sizeof *ct);
+  struct casewriter *writer;
+  ct->subwriter = casewriter_rename (subwriter);
+  ct->translate = translate;
+  ct->destroy = destroy;
+  ct->aux = aux;
+  writer = casewriter_create (&casewriter_translator_class, ct);
+  taint_propagate (casewriter_get_taint (ct->subwriter),
+                   casewriter_get_taint (writer));
+  return writer;
+}
+
+static void
+casewriter_translator_write (struct casewriter *writer UNUSED,
+                             void *ct_, struct ccase *c) 
+{
+  struct casewriter_translator *ct = ct_;
+  struct ccase tmp;
+
+  ct->translate (c, &tmp, ct->aux);
+  casewriter_write (ct->subwriter, &tmp);
+}
+
+static void
+casewriter_translator_destroy (struct casewriter *writer UNUSED, void *ct_) 
+{
+  struct casewriter_translator *ct = ct_;
+  casewriter_destroy (ct->subwriter);
+  ct->destroy (ct->aux);
+  free (ct);
+}
+
+static struct casereader *
+casewriter_translator_convert_to_reader (struct casewriter *writer UNUSED,
+                                         void *ct_)
+{
+  struct casewriter_translator *ct = ct_;
+  struct casereader *reader = casewriter_make_reader (ct->subwriter);
+  free (ct);
+  ct->destroy (ct->aux);
+  return reader;
+}
+
+static struct casewriter_class casewriter_translator_class = 
+  {
+    casewriter_translator_write,
+    casewriter_translator_destroy,
+    casewriter_translator_convert_to_reader,
+  };
diff --git a/src/data/casewriter.c b/src/data/casewriter.c
new file mode 100644 (file)
index 0000000..e277749
--- /dev/null
@@ -0,0 +1,287 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2007 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+#include <config.h>
+
+#include <data/casewriter.h>
+#include <data/casewriter-provider.h>
+
+#include <assert.h>
+#include <stdlib.h>
+
+#include <data/casereader.h>
+#include <data/casereader-provider.h>
+#include <data/casewindow.h>
+#include <data/settings.h>
+#include <libpspp/compiler.h>
+#include <libpspp/taint.h>
+
+#include "xalloc.h"
+
+/* A casewriter. */
+struct casewriter
+  {
+    struct taint *taint;
+    casenumber case_cnt;
+    const struct casewriter_class *class;
+    void *aux;
+  };
+
+static struct casewriter *create_casewriter_window (size_t value_cnt,
+                                                    casenumber max_in_core);
+
+/* Writes case C to WRITER. */
+void
+casewriter_write (struct casewriter *writer, struct ccase *c)
+{
+  writer->class->write (writer, writer->aux, c);
+}
+
+/* Destroys WRITER.
+   Returns true if successful, false if an I/O error was
+   encountered on WRITER or on some object on which WRITER has a
+   dependency. */
+bool
+casewriter_destroy (struct casewriter *writer) 
+{
+  bool ok = true;
+  if (writer != NULL)
+    {
+      writer->class->destroy (writer, writer->aux);
+      ok = taint_destroy (writer->taint);
+      free (writer);
+    }
+  return ok;
+}
+
+/* Destroys WRITER and in its place returns a casereader that can
+   be used to read back the data written to WRITER.  WRITER must
+   not be used again after calling this function, even as an
+   argument to casewriter_destroy.
+
+   Not all casewriters implement this function.  Behavior is
+   undefined if it is called on one that does not.
+
+   If an I/O error was encountered on WRITER or on some object on
+   which WRITER has a dependency, then the error will be
+   propagated to the new casereader. */
+struct casereader *
+casewriter_make_reader (struct casewriter *writer)
+{
+  struct casereader *reader;
+  reader = writer->class->convert_to_reader (writer, writer->aux);
+  taint_propagate (writer->taint, casereader_get_taint (reader));
+  taint_destroy (writer->taint);
+  free (writer);
+  return reader;
+}
+
+/* Returns a copy of WRITER, which is itself destroyed.
+   Useful for taking over ownership of a casewriter, to enforce
+   preventing the original owner from accessing the casewriter
+   again. */
+struct casewriter *
+casewriter_rename (struct casewriter *writer)
+{
+  struct casewriter *new = xmemdup (writer, sizeof *writer);
+  free (writer);
+  return new;
+}
+
+/* Returns true if an I/O error or another hard error has
+   occurred on WRITER, a clone of WRITER, or on some object on
+   which WRITER's data has a dependency, false otherwise. */
+bool
+casewriter_error (const struct casewriter *writer) 
+{
+  return taint_is_tainted (writer->taint);
+}
+
+/* Marks WRITER as having encountered an error.
+
+   Ordinarily, this function should be called by the
+   implementation of a casewriter, not by the casewriter's
+   client.  Instead, casewriter clients should usually ensure
+   that a casewriter's error state is correct by using
+   taint_propagate to propagate to the casewriter's taint
+   structure, which may be obtained via casewriter_get_taint. */
+void
+casewriter_force_error (struct casewriter *writer) 
+{
+  taint_set_taint (writer->taint);
+}
+
+/* Returns WRITER's associate taint object, for use with
+   taint_propagate and other taint functions. */
+const struct taint *
+casewriter_get_taint (const struct casewriter *writer) 
+{
+  return writer->taint;
+}
+
+/* Creates and returns a new casewriter with the given CLASS and
+   auxiliary data AUX. */
+struct casewriter *
+casewriter_create (const struct casewriter_class *class, void *aux) 
+{
+  struct casewriter *writer = xmalloc (sizeof *writer);
+  writer->taint = taint_create ();
+  writer->case_cnt = 0;
+  writer->class = class;
+  writer->aux = aux;
+  return writer;
+}
+
+/* Returns a casewriter for cases with VALUE_CNT struct values
+   per case.  The cases written to the casewriter will be kept in
+   memory, unless the amount of memory used grows too large, in
+   which case they will be written to disk.
+
+   A casewriter created with this function may be passed to
+   casewriter_make_reader. 
+
+   This is usually the right kind of casewriter to use. */
+struct casewriter *
+autopaging_writer_create (size_t value_cnt) 
+{
+  return create_casewriter_window (value_cnt, get_workspace_cases (value_cnt));
+}
+
+/* Returns a casewriter for cases with VALUE_CNT struct values
+   per case.  The cases written to the casewriter will be kept in
+   memory.
+
+   A casewriter created with this function may be passed to
+   casewriter_make_reader. */
+struct casewriter *
+mem_writer_create (size_t value_cnt) 
+{
+  return create_casewriter_window (value_cnt, CASENUMBER_MAX);
+}
+
+/* Returns a casewriter for cases with VALUE_CNT struct values
+   per case.  The cases written to the casewriter will be written
+   to disk.
+
+   A casewriter created with this function may be passed to
+   casewriter_make_reader. */
+struct casewriter *
+tmpfile_writer_create (size_t value_cnt) 
+{
+  return create_casewriter_window (value_cnt, 0);
+}
+\f
+static const struct casewriter_class casewriter_window_class;
+static const struct casereader_random_class casereader_window_class;
+
+/* Creates and returns a new casewriter based on a casewindow.
+   Each of the casewriter's cases are composed of VALUE_CNT
+   struct values.  The casewriter's cases will be maintained in
+   memory until MAX_IN_CORE_CASES have been written, at which
+   point they will be written to disk. */
+static struct casewriter *
+create_casewriter_window (size_t value_cnt, casenumber max_in_core_cases) 
+{
+  struct casewindow *window = casewindow_create (value_cnt, max_in_core_cases);
+  struct casewriter *writer = casewriter_create (&casewriter_window_class,
+                                                 window);
+  taint_propagate (casewindow_get_taint (window),
+                   casewriter_get_taint (writer));
+  return writer;
+}
+
+/* Writes case C to casewindow writer WINDOW. */
+static void
+casewriter_window_write (struct casewriter *writer UNUSED, void *window_,
+                         struct ccase *c) 
+{
+  struct casewindow *window = window_;
+  casewindow_push_head (window, c);
+}
+
+/* Destroys casewindow writer WINDOW. */
+static void
+casewriter_window_destroy (struct casewriter *writer UNUSED, void *window_)
+{
+  struct casewindow *window = window_;
+  casewindow_destroy (window);
+}
+
+/* Converts casewindow writer WINDOW to a casereader and returns
+   the casereader. */
+static struct casereader *
+casewriter_window_convert_to_reader (struct casewriter *writer UNUSED,
+                                     void *window_) 
+{
+  struct casewindow *window = window_;
+  struct casereader *reader;
+  reader = casereader_create_random (casewindow_get_value_cnt (window),
+                                     casewindow_get_case_cnt (window),
+                                     &casereader_window_class, window);
+  taint_propagate (casewindow_get_taint (window),
+                   casereader_get_taint (reader));
+  return reader;
+}
+
+/* Reads the case at the given 0-based OFFSET from the front of
+   WINDOW into C.  Returns true if successful, false if
+   OFFSET is beyond the end of file or upon I/O error. */
+static bool
+casereader_window_read (struct casereader *reader UNUSED, void *window_,
+                        casenumber offset, struct ccase *c) 
+{
+  struct casewindow *window = window_;
+  if (offset >= casewindow_get_case_cnt (window))
+    return false;
+  else
+    return casewindow_get_case (window, offset, c);
+}
+
+/* Destroys casewindow reader WINDOW. */
+static void
+casereader_window_destroy (struct casereader *reader UNUSED, void *window_)
+{
+  struct casewindow *window = window_;
+  casewindow_destroy (window);
+}
+
+/* Discards CASE_CNT cases from the front of WINDOW. */
+static void
+casereader_window_advance (struct casereader *reader UNUSED, void *window_,
+                           casenumber case_cnt) 
+{
+  struct casewindow *window = window_;
+  casewindow_pop_tail (window, case_cnt);
+}
+
+/* Class for casewindow writer. */
+static const struct casewriter_class casewriter_window_class = 
+  {
+    casewriter_window_write,
+    casewriter_window_destroy,
+    casewriter_window_convert_to_reader,
+  };
+
+/* Class for casewindow reader. */
+static const struct casereader_random_class casereader_window_class = 
+  {
+    casereader_window_read,
+    casereader_window_destroy,
+    casereader_window_advance,
+  };
+\f
diff --git a/src/data/casewriter.h b/src/data/casewriter.h
new file mode 100644 (file)
index 0000000..a1d0807
--- /dev/null
@@ -0,0 +1,52 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2007 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+#ifndef DATA_CASEWRITER_H
+#define DATA_CASEWRITER_H 1
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <data/transformations.h>
+#include <libpspp/compiler.h>
+
+struct casewriter;
+
+void casewriter_write (struct casewriter *, struct ccase *);
+bool casewriter_destroy (struct casewriter *);
+
+struct casereader *casewriter_make_reader (struct casewriter *);
+
+struct casewriter *casewriter_rename (struct casewriter *);
+
+bool casewriter_error (const struct casewriter *);
+void casewriter_force_error (struct casewriter *);
+const struct taint *casewriter_get_taint (const struct casewriter *);
+
+struct casewriter *mem_writer_create (size_t value_cnt);
+struct casewriter *tmpfile_writer_create (size_t value_cnt);
+struct casewriter *autopaging_writer_create (size_t value_cnt);
+\f
+struct casewriter *
+casewriter_create_translator (struct casewriter *, 
+                              void (*translate) (const struct ccase *input,
+                                                 struct ccase *output,
+                                                 void *aux),
+                              bool (*destroy) (void *aux),
+                              void *aux);
+
+#endif /* data/casewriter.h */