Added abstract factory to create casefiles. Updated procedures to use
[pspp-builds.git] / src / data / procedure.c
index b59f128299ecce3efd0056922360ec6acb59fba9..8aec2c9fe05092d1beded5ca2bddafbc12048e1d 100644 (file)
@@ -1,6 +1,5 @@
 /* PSPP - computes sample statistics.
    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
-   Written by Ben Pfaff <blp@gnu.org>.
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
 
-/* Procedure execution data. */
-struct write_case_data
-  {
-    /* Function to call for each case. */
-    case_func *proc;
-    void *aux;
+struct dataset {
 
-    struct dataset *dataset;    /* The dataset concerned */
-    struct ccase trns_case;     /* Case used for transformations. */
-    struct ccase sink_case;     /* Case written to sink, if
-                                   compacting is necessary. */
-    size_t cases_written;       /* Cases output so far. */
-  };
+  /* An abstract factory which creates casefiles */
+  struct casefile_factory *cf_factory;
 
-struct dataset {
   /* Cases are read from proc_source,
      pass through permanent_trns_chain (which transforms them into
      the format described by permanent_dict),
@@ -85,6 +74,13 @@ struct dataset {
   int lag_head;                /* Index where next case will be added. */
   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
+  /* Procedure data. */
+  bool is_open;               /* Procedure open? */
+  struct ccase trns_case;     /* Case used for transformations. */
+  struct ccase sink_case;     /* Case written to sink, if
+                                 compacting is necessary. */
+  size_t cases_written;       /* Cases output so far. */
+  bool ok;
 }; /* struct dataset */
 
 
@@ -97,7 +93,6 @@ static bool internal_procedure (struct dataset *ds, case_func *,
 static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (struct dataset *ds);
-static bool write_case (struct write_case_data *wc_data);
 static void lag_case (struct dataset *ds, const struct ccase *c);
 static void clear_case (const struct dataset *ds, struct ccase *c);
 static bool close_active_file (struct dataset *ds);
@@ -135,6 +130,23 @@ time_of_last_procedure (struct dataset *ds)
 bool
 procedure (struct dataset *ds, case_func *cf, void *aux)
 {
+  update_last_proc_invocation (ds);
+
+  /* Optimize the trivial case where we're not going to do
+     anything with the data, by not reading the data at all. */
+  if (cf == NULL
+      && case_source_is_class (ds->proc_source, &storage_source_class)
+      && ds->proc_sink == NULL
+      && (ds->temporary_trns_chain == NULL
+          || trns_chain_is_empty (ds->temporary_trns_chain))
+      && trns_chain_is_empty (ds->permanent_trns_chain))
+    {
+      ds->n_lag = 0;
+      dict_set_case_limit (ds->dict, 0);
+      dict_clear_vectors (ds->dict);
+      return true;
+    }
+
   return internal_procedure (ds, cf, NULL, aux);
 }
 \f
@@ -174,7 +186,10 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
   struct multipass_aux_data aux_data;
   bool ok;
 
-  aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
+  aux_data.casefile =
+    ds->cf_factory->create_casefile (ds->cf_factory,
+                                    dict_get_next_value_idx (ds->dict));
+
   aux_data.proc_func = proc_func;
   aux_data.aux = aux;
 
@@ -188,7 +203,6 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
 \f
 /* Procedure implementation. */
 
-
 /* Executes a procedure.
    Passes each case to CASE_FUNC.
    Calls END_FUNC after the last case.
@@ -199,50 +213,132 @@ internal_procedure (struct dataset *ds, case_func *proc,
                    end_func *end,
                     void *aux) 
 {
-  struct write_case_data wc_data;
+  struct ccase *c;
   bool ok = true;
+  
+  proc_open (ds);
+  while (ok && proc_read (ds, &c))
+    if (proc != NULL)
+      ok = proc (c, aux, ds) && ok;
+  if (end != NULL)
+    ok = end (aux, ds) && ok;
+  return proc_close (ds) && ok;
+}
 
+/* Opens dataset DS for reading cases with proc_read.
+   proc_close must be called when done. */
+void
+proc_open (struct dataset *ds)
+{
   assert (ds->proc_source != NULL);
+  assert (!ds->is_open);
 
   update_last_proc_invocation (ds);
 
-  /* Optimize the trivial case where we're not going to do
-     anything with the data, by not reading the data at all. */
-  if (proc == NULL && end == NULL
-      && case_source_is_class (ds->proc_source, &storage_source_class)
-      && ds->proc_sink == NULL
-      && (ds->temporary_trns_chain == NULL
-          || trns_chain_is_empty (ds->temporary_trns_chain))
-      && trns_chain_is_empty (ds->permanent_trns_chain))
+  open_active_file (ds);
+
+  ds->is_open = true;
+  create_trns_case (&ds->trns_case, ds->dict);
+  case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
+  ds->cases_written = 0;
+  ds->ok = true;
+}
+
+/* Reads the next case from dataset DS, which must have been
+   opened for reading with proc_open.
+   Returns true if successful, in which case a pointer to the
+   case is stored in *C.
+   Return false at end of file or if a read error occurs.  In
+   this case a null pointer is stored in *C. */
+bool
+proc_read (struct dataset *ds, struct ccase **c) 
+{
+  enum trns_result retval = TRNS_DROP_CASE;
+
+  assert (ds->is_open);
+  *c = NULL;
+  for (;;) 
     {
-      ds->n_lag = 0;
-      dict_set_case_limit (ds->dict, 0);
-      dict_clear_vectors (ds->dict);
-      return true;
-    }
+      size_t case_nr;
+
+      assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
+      if (retval == TRNS_ERROR)
+        ds->ok = false;
+      if (!ds->ok)
+        return false;
+
+      /* Read a case from proc_source. */
+      clear_case (ds, &ds->trns_case);
+      if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
+        return false;
+
+      /* Execute permanent transformations.  */
+      case_nr = ds->cases_written + 1;
+      retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
+                                   &ds->trns_case, &case_nr);
+      if (retval != TRNS_CONTINUE)
+        continue;
   
-  open_active_file (ds);
+      /* Write case to LAG queue. */
+      if (ds->n_lag)
+        lag_case (ds, &ds->trns_case);
+
+      /* Write case to replacement active file. */
+      ds->cases_written++;
+      if (ds->proc_sink->class->write != NULL) 
+        {
+          if (ds->compactor != NULL) 
+            {
+              dict_compactor_compact (ds->compactor, &ds->sink_case,
+                                      &ds->trns_case);
+              ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
+            }
+          else
+            ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
+        }
   
-  wc_data.proc = proc;
-  wc_data.aux = aux;
-  wc_data.dataset = ds;
-  create_trns_case (&wc_data.trns_case, ds->dict);
-  case_create (&wc_data.sink_case,
-               dict_get_compacted_value_cnt (ds->dict));
-  wc_data.cases_written = 0;
-
-  ok = ds->proc_source->class->read (ds->proc_source,
-                                 &wc_data.trns_case,
-                                 write_case, &wc_data) && ok;
-  if (end != NULL)
-    ok = end (aux, ds) && ok;
+      /* Execute temporary transformations. */
+      if (ds->temporary_trns_chain != NULL) 
+        {
+          retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
+                                       &ds->trns_case, &ds->cases_written);
+          if (retval != TRNS_CONTINUE)
+            continue;
+        }
 
-  case_destroy (&wc_data.sink_case);
-  case_destroy (&wc_data.trns_case);
+      *c = &ds->trns_case;
+      return true; 
+    }
+}
 
-  ok = close_active_file (ds) && ok;
+/* Closes dataset DS for reading.
+   Returns true if successful, false if an I/O error occurred
+   while reading or closing the data set.
+   If DS has not been opened, returns true without doing
+   anything else. */
+bool
+proc_close (struct dataset *ds) 
+{
+  if (!ds->is_open)
+    return true;
 
-  return ok;
+  /* Drain any remaining cases. */
+  while (ds->ok) 
+    {
+      struct ccase *c;
+      if (!proc_read (ds, &c))
+        break; 
+    }
+  ds->ok = free_case_source (ds->proc_source) && ds->ok;
+  ds->proc_source = NULL;
+
+  case_destroy (&ds->sink_case);
+  case_destroy (&ds->trns_case);
+
+  ds->ok = close_active_file (ds) && ds->ok;
+  ds->is_open = false;
+
+  return ds->ok;
 }
 
 /* Updates last_proc_invocation. */
@@ -265,12 +361,12 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict)
   for (i = 0; i < var_cnt; i++) 
     {
       struct variable *v = dict_get_var (dict, i);
-      union value *value = case_data_rw (trns_case, v->fv);
+      union value *value = case_data_rw (trns_case, v);
 
-      if (v->type == NUMERIC)
-        value->f = v->leave ? 0.0 : SYSMIS;
+      if (var_is_numeric (v))
+        value->f = var_get_leave (v) ? 0.0 : SYSMIS;
       else
-        memset (value->s, ' ', v->width);
+        memset (value->s, ' ', var_get_width (v));
     }
 }
 
@@ -298,7 +394,10 @@ open_active_file (struct dataset *ds)
 
   /* Prepare sink. */
   if (ds->proc_sink == NULL)
-    ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
+    ds->proc_sink = create_case_sink (&storage_sink_class,
+                                     ds->permanent_dict,
+                                     ds->cf_factory,
+                                     NULL);
   if (ds->proc_sink->class->open != NULL)
     ds->proc_sink->class->open (ds->proc_sink);
 
@@ -315,63 +414,6 @@ open_active_file (struct dataset *ds)
     }
 }
 
-/* Transforms trns_case and writes it to the replacement active
-   file if advisable.  Returns true if more cases can be
-   accepted, false otherwise.  Do not call this function again
-   after it has returned false once.  */
-static bool
-write_case (struct write_case_data *wc_data)
-{
-  enum trns_result retval;
-  size_t case_nr;
-
-  struct dataset *ds = wc_data->dataset;
-  
-  /* Execute permanent transformations.  */
-  case_nr = wc_data->cases_written + 1;
-  retval = trns_chain_execute (ds->permanent_trns_chain,
-                               &wc_data->trns_case, &case_nr);
-  if (retval != TRNS_CONTINUE)
-    goto done;
-
-  /* Write case to LAG queue. */
-  if (ds->n_lag)
-    lag_case (ds, &wc_data->trns_case);
-
-  /* Write case to replacement active file. */
-  wc_data->cases_written++;
-  if (ds->proc_sink->class->write != NULL) 
-    {
-      if (ds->compactor != NULL) 
-        {
-          dict_compactor_compact (ds->compactor, &wc_data->sink_case,
-                                  &wc_data->trns_case);
-          ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
-        }
-      else
-        ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
-    }
-  
-  /* Execute temporary transformations. */
-  if (ds->temporary_trns_chain != NULL) 
-    {
-      retval = trns_chain_execute (ds->temporary_trns_chain,
-                                   &wc_data->trns_case,
-                                   &wc_data->cases_written);
-      if (retval != TRNS_CONTINUE)
-        goto done;
-    }
-
-  /* Pass case to procedure. */
-  if (wc_data->proc != NULL)
-    if (!wc_data->proc (&wc_data->trns_case, wc_data->aux, ds))
-      retval = TRNS_ERROR;
-
- done:
-  clear_case (ds, &wc_data->trns_case);
-  return retval != TRNS_ERROR;
-}
-
 /* Add C to the lag queue. */
 static void
 lag_case (struct dataset *ds, const struct ccase *c)
@@ -395,12 +437,12 @@ clear_case (const struct dataset *ds, struct ccase *c)
   for (i = 0; i < var_cnt; i++) 
     {
       struct variable *v = dict_get_var (ds->dict, i);
-      if (!v->leave
+      if (!var_get_leave (v)
         {
-          if (v->type == NUMERIC)
-            case_data_rw (c, v->fv)->f = SYSMIS;
+          if (var_is_numeric (v))
+            case_data_rw (c, v)->f = SYSMIS; 
           else
-            memset (case_data_rw (c, v->fv)->s, ' ', v->width);
+            memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
         } 
     }
 }
@@ -431,10 +473,6 @@ close_active_file (struct dataset *ds)
       ds->compactor = NULL;
     }
     
-  /* Free data source. */
-  free_case_source (ds->proc_source);
-  ds->proc_source = NULL;
-
   /* Old data sink becomes new data source. */
   if (ds->proc_sink->class->make_source != NULL)
     ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
@@ -633,7 +671,8 @@ multipass_split_case_func (const struct ccase *c, void *aux_, const struct datas
 
       /* Start a new casefile. */
       aux->casefile = 
-       fastfile_create (dict_get_next_value_idx (ds->dict));
+       ds->cf_factory->create_casefile (ds->cf_factory,
+                                        dict_get_next_value_idx (ds->dict));
     }
 
   return casefile_append (aux->casefile, c) && ok;
@@ -805,10 +844,11 @@ proc_cancel_all_transformations (struct dataset *ds)
 \f
 /* Initializes procedure handling. */
 struct dataset *
-create_dataset (void)
+create_dataset (struct casefile_factory *fact)
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
+  ds->cf_factory = fact;
   proc_cancel_all_transformations (ds);
   return ds;
 }
@@ -937,8 +977,8 @@ filter_trns_proc (void *filter_var_,
   
 {
   struct variable *filter_var = filter_var_;
-  double f = case_num (c, filter_var->fv);
-  return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
+  double f = case_num (c, filter_var);
+  return (f != 0.0 && !var_is_num_missing (filter_var, f)
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
@@ -969,3 +1009,9 @@ dataset_set_n_lag (struct dataset *ds, int n_lag)
 }
 
 
+struct casefile_factory *
+dataset_get_casefile_factory (const struct dataset *ds)
+{
+  return ds->cf_factory;
+}
+