casereader: Make parameter to casereader_count_cases const.
[pspp-builds.git] / src / data / casereader.c
index 3d27a9192a79bd63466e25e2059ce91824e94241..97b7ceff3d1b7eb19e8ac6243aef08eb961a0f6b 100644 (file)
@@ -34,7 +34,7 @@
 struct casereader
   {
     struct taint *taint;                  /* Corrupted? */
-    size_t value_cnt;                     /* Values per case. */
+    struct caseproto *proto;              /* Format of contained cases. */
     casenumber case_cnt;                  /* Number of cases,
                                              CASENUMBER_MAX if unknown. */
     const struct casereader_class *class; /* Class. */
@@ -74,7 +74,10 @@ casereader_read (struct casereader *reader)
       c = reader->class->read (reader, reader->aux);
       if (c != NULL)
         {
-          assert (case_get_value_cnt (c) >= reader->value_cnt);
+          size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
+          assert (case_get_value_cnt (c) >= n_widths);
+          expensive_assert (caseproto_equal (case_get_proto (c), 0,
+                                             reader->proto, 0, n_widths));
           return c;
         }
     }
@@ -93,6 +96,7 @@ casereader_destroy (struct casereader *reader)
     {
       reader->class->destroy (reader, reader->aux);
       ok = taint_destroy (reader->taint);
+      caseproto_unref (reader->proto);
       free (reader);
     }
   return ok;
@@ -104,7 +108,7 @@ casereader_destroy (struct casereader *reader)
 struct casereader *
 casereader_clone (const struct casereader *reader_)
 {
-  struct casereader *reader = (struct casereader *) reader_;
+  struct casereader *reader = CONST_CAST (struct casereader *, reader_);
   struct casereader *clone;
   if ( reader == NULL ) 
     return NULL;
@@ -253,6 +257,20 @@ casereader_get_case_cnt (struct casereader *reader)
   return reader->case_cnt;
 }
 
+static casenumber
+casereader_count_cases__ (const struct casereader *reader,
+                          casenumber max_cases)
+{
+  struct casereader *clone;
+  casenumber n_cases;
+
+  clone = casereader_clone (reader);
+  n_cases = casereader_advance (clone, max_cases);
+  casereader_destroy (clone);
+
+  return n_cases;
+}
+
 /* Returns the number of cases that will be read by successive
    calls to casereader_read for READER, assuming that no errors
    occur.  Upon an error condition, the case count drops to 0, so
@@ -263,31 +281,57 @@ casereader_get_case_cnt (struct casereader *reader)
    of the contents of a clone of READER.  Thus, the return value
    is always correct in the absence of I/O errors. */
 casenumber
-casereader_count_cases (struct casereader *reader)
+casereader_count_cases (const struct casereader *reader)
 {
   if (reader->case_cnt == CASENUMBER_MAX)
     {
-      casenumber n_cases = 0;
-      struct ccase *c;
+      struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
+      reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
+    }
+  return reader->case_cnt;
+}
+
+/* Truncates READER to at most N cases. */
+void
+casereader_truncate (struct casereader *reader, casenumber n)
+{
+  /* This could be optimized, if it ever becomes too expensive, by adding a
+     "max_cases" member to struct casereader.  We could also add a "truncate"
+     function to the casereader implementation, to allow the casereader to
+     throw away data that cannot ever be read. */
+  if (reader->case_cnt == CASENUMBER_MAX)
+    reader->case_cnt = casereader_count_cases__ (reader, n);
+  if (reader->case_cnt > n)
+    reader->case_cnt = n;
+}
 
-      struct casereader *clone = casereader_clone (reader);
+/* Returns the prototype for the cases in READER.  The caller
+   must not unref the returned prototype. */
+const struct caseproto *
+casereader_get_proto (const struct casereader *reader)
+{
+  return reader->proto;
+}
 
-      for (; (c = casereader_read (clone)) != NULL; case_unref (c))
-        n_cases++;
+/* Skips past N cases in READER, stopping when the last case in
+   READER has been read or on an input error.  Returns the number
+   of cases successfully skipped. */
+casenumber
+casereader_advance (struct casereader *reader, casenumber n)
+{
+  casenumber i;
 
-      casereader_destroy (clone);
-      reader->case_cnt = n_cases;
+  for (i = 0; i < n; i++)
+    {
+      struct ccase *c = casereader_read (reader);
+      if (c == NULL)
+        break;
+      case_unref (c);
     }
 
-  return reader->case_cnt;
+  return i;
 }
 
-/* Returns the number of struct values in each case in READER. */
-size_t
-casereader_get_value_cnt (struct casereader *reader)
-{
-  return reader->value_cnt;
-}
 
 /* Copies all the cases in READER to WRITER, propagating errors
    appropriately. */
@@ -320,8 +364,9 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer)
    function, in which case the cloned casereader should have the
    same taint object as the original casereader.)
 
-   VALUE_CNT must be the number of struct values per case read
-   from the casereader.
+   PROTO must be the prototype for the cases that may be read
+   from the casereader.  The caller retains its reference to
+   PROTO.
 
    CASE_CNT is an upper limit on the number of cases that
    casereader_read will return from the casereader in successive
@@ -334,12 +379,13 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer)
    functions, respectively. */
 struct casereader *
 casereader_create_sequential (const struct taint *taint,
-                              size_t value_cnt, casenumber case_cnt,
+                              const struct caseproto *proto,
+                              casenumber case_cnt,
                               const struct casereader_class *class, void *aux)
 {
   struct casereader *reader = xmalloc (sizeof *reader);
   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
-  reader->value_cnt = value_cnt;
+  reader->proto = caseproto_ref (proto);
   reader->case_cnt = case_cnt;
   reader->class = class;
   reader->aux = aux;
@@ -434,8 +480,9 @@ compare_random_readers_by_offset (const struct heap_node *a_,
    casereader_create_sequential is more appropriate for a data
    source that is naturally sequential.
 
-   VALUE_CNT must be the number of struct values per case read
-   from the casereader.
+   PROTO must be the prototype for the cases that may be read
+   from the casereader.  The caller retains its reference to
+   PROTO.
 
    CASE_CNT is an upper limit on the number of cases that
    casereader_read will return from the casereader in successive
@@ -447,7 +494,7 @@ compare_random_readers_by_offset (const struct heap_node *a_,
    member functions and auxiliary data to pass to those member
    functions, respectively. */
 struct casereader *
-casereader_create_random (size_t value_cnt, casenumber case_cnt,
+casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
                           const struct casereader_random_class *class,
                           void *aux)
 {
@@ -456,7 +503,7 @@ casereader_create_random (size_t value_cnt, casenumber case_cnt,
   shared->class = class;
   shared->aux = aux;
   shared->min_offset = 0;
-  return casereader_create_sequential (NULL, value_cnt, case_cnt,
+  return casereader_create_sequential (NULL, proto, case_cnt,
                                        &random_reader_casereader_class,
                                        make_random_reader (shared, 0));
 }
@@ -524,7 +571,7 @@ random_reader_clone (struct casereader *reader, void *br_)
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
   return casereader_create_sequential (casereader_get_taint (reader),
-                                       casereader_get_value_cnt (reader),
+                                       reader->proto,
                                        casereader_get_case_cnt (reader),
                                        &random_reader_casereader_class,
                                        make_random_reader (shared,
@@ -588,12 +635,11 @@ static const struct casereader_random_class shim_class;
 static void
 insert_shim (struct casereader *reader)
 {
-  size_t value_cnt = casereader_get_value_cnt (reader);
+  const struct caseproto *proto = casereader_get_proto (reader);
   casenumber case_cnt = casereader_get_case_cnt (reader);
   struct shim *b = xmalloc (sizeof *b);
-  b->window = casewindow_create (value_cnt, settings_get_workspace_cases (value_cnt));
-  b->subreader = casereader_create_random (value_cnt, case_cnt,
-                                           &shim_class, b);
+  b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
+  b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
   casereader_swap (reader, b->subreader);
   taint_propagate (casewindow_get_taint (b->window),
                    casereader_get_taint (reader));
@@ -657,3 +703,42 @@ static const struct casereader_random_class shim_class =
     shim_destroy,
     shim_advance,
   };
+\f
+static const struct casereader_class casereader_null_class;
+
+/* Returns a casereader with no cases.  The casereader has the prototype
+   specified by PROTO.  PROTO may be specified as a null pointer, in which case
+   the casereader has no variables. */
+struct casereader *
+casereader_create_empty (const struct caseproto *proto_)
+{
+  struct casereader *reader;
+  struct caseproto *proto;
+
+  proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
+  reader = casereader_create_sequential (NULL, proto, 0,
+                                         &casereader_null_class, NULL);
+  caseproto_unref (proto);
+
+  return reader;
+}
+
+static struct ccase *
+casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
+{
+  return NULL;
+}
+
+static void
+casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
+{
+  /* Nothing to do. */
+}
+
+static const struct casereader_class casereader_null_class =
+  {
+    casereader_null_read,
+    casereader_null_destroy,
+    NULL,                       /* clone */
+    NULL,                       /* peek */
+  };