Actually implement the new procedure code and adapt all of its clients
[pspp-builds.git] / src / language / stats / aggregate.c
index 297d2abee4cc495cc17f1aaedd4da0d9e00f53bf..22ca39d76373c7bd1634e1d5ad92ef1c0f96ecf8 100644 (file)
 #include <stdlib.h>
 
 #include <data/any-writer.h>
-#include <data/case-sink.h>
+#include <data/case-ordering.h>
 #include <data/case.h>
-#include <data/casefile.h>
+#include <data/casegrouper.h>
+#include <data/casereader.h>
+#include <data/casewriter.h>
 #include <data/dictionary.h>
 #include <data/file-handle-def.h>
 #include <data/format.h>
 #include <data/procedure.h>
 #include <data/settings.h>
-#include <data/storage-stream.h>
 #include <data/sys-file-writer.h>
 #include <data/variable.h>
 #include <language/command.h>
@@ -135,12 +136,8 @@ enum missing_treatment
 /* An entire AGGREGATE procedure. */
 struct agr_proc 
   {
-    /* We have either an output file or a sink. */
-    struct any_writer *writer;          /* Output file, or null if none. */
-    struct case_sink *sink;             /* Sink, or null if none. */
-
     /* Break variables. */
-    struct sort_criteria *sort;         /* Sort criteria. */
+    struct case_ordering *sort;         /* Sort criteria. */
     const struct variable **break_vars;       /* Break variables. */
     size_t break_var_cnt;               /* Number of break variables. */
     struct ccase break_case;            /* Last values of break variables. */
@@ -150,20 +147,18 @@ struct agr_proc
     struct dictionary *dict;            /* Aggregate dictionary. */
     const struct dictionary *src_dict;  /* Dict of the source */
     int case_cnt;                       /* Counts aggregated cases. */
-    struct ccase agr_case;              /* Aggregate case for output. */
   };
 
 static void initialize_aggregate_info (struct agr_proc *,
                                        const struct ccase *);
-
+static void accumulate_aggregate_info (struct agr_proc *,
+                                       const struct ccase *);
 /* Prototypes. */
 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
                                       struct agr_proc *);
 static void agr_destroy (struct agr_proc *);
-static bool aggregate_single_case (struct agr_proc *agr,
-                                  const struct ccase *input,
-                                  struct ccase *output);
-static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
+static void dump_aggregate_info (struct agr_proc *agr,
+                                 struct casewriter *output);
 \f
 /* Parsing. */
 
@@ -174,10 +169,14 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
   struct dictionary *dict = dataset_dict (ds);
   struct agr_proc agr;
   struct file_handle *out_file = NULL;
+  struct casereader *input = NULL, *group;
+  struct casegrouper *grouper;
+  struct casewriter *output = NULL;
 
   bool copy_documents = false;
   bool presorted = false;
   bool saw_direction;
+  bool ok;
 
   memset(&agr, 0 , sizeof (agr));
   agr.missing = ITEMWISE;
@@ -223,11 +222,13 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
           int i;
 
          lex_match (lexer, '=');
-          agr.sort = sort_parse_criteria (lexer, dict,
-                                          &agr.break_vars, &agr.break_var_cnt,
-                                          &saw_direction, NULL);
+          agr.sort = parse_case_ordering (lexer, dict,
+                                          
+                                          &saw_direction);
           if (agr.sort == NULL)
             goto error;
+          case_ordering_get_vars (agr.sort,
+                                  &agr.break_vars, &agr.break_var_cnt);
          
           for (i = 0; i < agr.break_var_cnt; i++)
             dict_clone_var_assert (agr.dict, agr.break_vars[i],
@@ -261,109 +262,69 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
   
   /* Initialize. */
   agr.case_cnt = 0;
-  case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
 
-  /* Output to active file or external file? */
   if (out_file == NULL) 
     {
-      struct ccase *c;
-      
       /* The active file will be replaced by the aggregated data,
          so TEMPORARY is moot. */
       proc_cancel_temporary_transformations (ds);
+      proc_discard_output (ds);
+      output = autopaging_writer_create (dict_get_next_value_idx (agr.dict));
+    }
+  else 
+    {
+      output = any_writer_open (out_file, agr.dict);
+      if (output == NULL)
+        goto error;
+    }
 
-      if (agr.sort != NULL && !presorted) 
-        {
-          if (!sort_active_file_in_place (ds, agr.sort))
-            goto error;
-        }
+  input = proc_open (ds);
+  if (agr.sort != NULL && !presorted) 
+    {
+      input = sort_execute (input, agr.sort);
+      agr.sort = NULL; 
+    }
 
-      agr.sink = create_case_sink (&storage_sink_class, agr.dict,
-                                  dataset_get_casefile_factory (ds),
-                                  NULL);
-      if (agr.sink->class->open != NULL)
-        agr.sink->class->open (agr.sink);
-      proc_set_sink (ds, 
-                    create_case_sink (&null_sink_class, dict,
-                                      dataset_get_casefile_factory (ds),
-                                      NULL));
-      proc_open (ds);
-      while (proc_read (ds, &c))
-        if (aggregate_single_case (&agr, c, &agr.agr_case)) 
-          if (!agr.sink->class->write (agr.sink, &agr.agr_case)) 
-            {
-              proc_close (ds);
-              goto error; 
-            }
-      if (!proc_close (ds))
-        goto error;
+  for (grouper = casegrouper_create_vars (input, agr.break_vars,
+                                          agr.break_var_cnt);
+       casegrouper_get_next_group (grouper, &group);
+       casereader_destroy (group)) 
+    {
+      struct ccase c;
+      
+      if (!casereader_peek (group, 0, &c))
+        continue;
+      initialize_aggregate_info (&agr, &c);
+      case_destroy (&c);
+
+      for (; casereader_read (group, &c); case_destroy (&c)) 
+        accumulate_aggregate_info (&agr, &c);
+      dump_aggregate_info (&agr, output);
+    }
+  if (!casegrouper_destroy (grouper))
+    goto error;
 
-      if (agr.case_cnt > 0) 
-        {
-          dump_aggregate_info (&agr, &agr.agr_case);
-          if (!agr.sink->class->write (agr.sink, &agr.agr_case))
-            goto error;
-        }
-      discard_variables (ds);
-      dataset_set_dict (ds, agr.dict);
-      agr.dict = NULL;
-      proc_set_source (ds, agr.sink->class->make_source (agr.sink));
-      free_case_sink (agr.sink);
+  if (!proc_commit (ds)) 
+    {
+      input = NULL;
+      goto error;
     }
-  else
+  input = NULL;
+
+  if (out_file == NULL) 
     {
-      agr.writer = any_writer_open (out_file, agr.dict);
-      if (agr.writer == NULL)
+      struct casereader *next_input = casewriter_make_reader (output);
+      if (next_input == NULL)
         goto error;
       
-      if (agr.sort != NULL && !presorted) 
-        {
-          /* Sorting is needed. */
-          struct casefile *dst;
-          struct casereader *reader;
-          struct ccase c;
-          bool ok = true;
-          
-          dst = sort_active_file_to_casefile (ds, agr.sort);
-          if (dst == NULL)
-            goto error;
-          reader = casefile_get_destructive_reader (dst);
-          while (ok && casereader_read_xfer (reader, &c)) 
-            {
-              if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
-                ok = any_writer_write (agr.writer, &agr.agr_case);
-              case_destroy (&c);
-            }
-          casereader_destroy (reader);
-          if (ok)
-            ok = !casefile_error (dst);
-          casefile_destroy (dst);
-          if (!ok)
-            goto error;
-        }
-      else 
-        {
-          /* Active file is already sorted. */
-          struct ccase *c;
-          
-          proc_open (ds);
-          while (proc_read (ds, &c))
-            if (aggregate_single_case (&agr, c, &agr.agr_case)) 
-              if (!any_writer_write (agr.writer, &agr.agr_case)) 
-                {
-                  proc_close (ds);
-                  goto error;
-                }
-          if (!proc_close (ds))
-            goto error;
-        }
-      
-      if (agr.case_cnt > 0) 
-        {
-          dump_aggregate_info (&agr, &agr.agr_case);
-          any_writer_write (agr.writer, &agr.agr_case);
-        }
-      if (any_writer_error (agr.writer))
+      proc_set_active_file (ds, next_input, agr.dict);
+      agr.dict = NULL;
+    }
+  else 
+    {
+      ok = casewriter_destroy (output);
+      output = NULL;
+      if (!ok)
         goto error;
     }
   
@@ -371,6 +332,9 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
   return CMD_SUCCESS;
 
 error:
+  if (input != NULL)
+    proc_commit (ds);
+  casewriter_destroy (output);
   agr_destroy (&agr);
   return CMD_CASCADING_FAILURE;
 }
@@ -717,9 +681,7 @@ agr_destroy (struct agr_proc *agr)
 {
   struct agr_var *iter, *next;
 
-  any_writer_close (agr->writer);
-  if (agr->sort != NULL)
-    sort_destroy_criteria (agr->sort);
+  case_ordering_destroy (agr->sort);
   free (agr->break_vars);
   case_destroy (&agr->break_case);
   for (iter = agr->agr_vars; iter; iter = next)
@@ -742,44 +704,13 @@ agr_destroy (struct agr_proc *agr)
     }
   if (agr->dict != NULL)
     dict_destroy (agr->dict);
-
-  case_destroy (&agr->agr_case);
 }
 \f
 /* Execution. */
 
-static void accumulate_aggregate_info (struct agr_proc *,
-                                       const struct ccase *);
-static void dump_aggregate_info (struct agr_proc *, struct ccase *);
-
-/* Processes a single case INPUT for aggregation.  If output is
-   warranted, writes it to OUTPUT and returns true.
-   Otherwise, returns false and OUTPUT is unmodified. */
-static bool
-aggregate_single_case (struct agr_proc *agr,
-                       const struct ccase *input, struct ccase *output)
-{
-  bool finished_group = false;
-  
-  if (agr->case_cnt++ == 0)
-    initialize_aggregate_info (agr, input);
-  else if (case_compare (&agr->break_case, input,
-                         agr->break_vars, agr->break_var_cnt))
-    {
-      dump_aggregate_info (agr, output);
-      finished_group = true;
-
-      initialize_aggregate_info (agr, input);
-    }
-
-  accumulate_aggregate_info (agr, input);
-  return finished_group;
-}
-
 /* Accumulates aggregation data from the case INPUT. */
 static void 
-accumulate_aggregate_info (struct agr_proc *agr,
-                           const struct ccase *input)
+accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
 {
   struct agr_var *iter;
   double weight;
@@ -947,12 +878,14 @@ accumulate_aggregate_info (struct agr_proc *agr,
     }
 }
 
-/* We've come to a record that differs from the previous in one or
-   more of the break variables.  Make an output record from the
-   accumulated statistics in the OUTPUT case. */
+/* Writes an aggregated record to OUTPUT. */
 static void 
-dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
+dump_aggregate_info (struct agr_proc *agr, struct casewriter *output)
 {
+  struct ccase c;
+
+  case_create (&c, dict_get_next_value_idx (agr->dict));
+
   {
     int value_idx = 0;
     int i;
@@ -961,7 +894,7 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
       {
         const struct variable *v = agr->break_vars[i];
         size_t value_cnt = var_get_value_cnt (v);
-        memcpy (case_data_rw_idx (output, value_idx),
+        memcpy (case_data_rw_idx (&c, value_idx),
                 case_data (&agr->break_case, v),
                 sizeof (union value) * value_cnt);
         value_idx += value_cnt; 
@@ -973,7 +906,7 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
   
     for (i = agr->agr_vars; i; i = i->next)
       {
-       union value *v = case_data_rw (output, i->dest);
+       union value *v = case_data_rw (&c, i->dest);
 
        if (agr->missing == COLUMNWISE && i->saw_missing
            && (i->function & FUNC) != N && (i->function & FUNC) != NU
@@ -1076,6 +1009,8 @@ dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
          }
       }
   }
+
+  casewriter_write (output, &c);
 }
 
 /* Resets the state for all the aggregate functions. */