Make it possible to pull cases from the active file with a
[pspp-builds.git] / src / language / stats / aggregate.c
index 4c85592bc58973baf0ef7ddd6aaf19e8d350455c..79d90fdd9d9fb58b6e18e0cd9c5de7a539705417 100644 (file)
@@ -164,12 +164,6 @@ static bool aggregate_single_case (struct agr_proc *agr,
                                   const struct ccase *input,
                                   struct ccase *output);
 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
-
-/* Aggregating to the active file. */
-static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
-
-/* Aggregating to a system file. */
-static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
 \f
 /* Parsing. */
 
@@ -272,6 +266,8 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
   /* Output to active file or external file? */
   if (out_file == NULL) 
     {
+      struct ccase *c;
+      
       /* The active file will be replaced by the aggregated data,
          so TEMPORARY is moot. */
       proc_cancel_temporary_transformations (ds);
@@ -286,9 +282,16 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
       if (agr.sink->class->open != NULL)
         agr.sink->class->open (agr.sink);
       proc_set_sink (ds, 
-                    create_case_sink (&null_sink_class, 
-                                      dict, NULL));
-      if (!procedure (ds, agr_to_active_file, &agr))
+                    create_case_sink (&null_sink_class, dict, NULL));
+      proc_open (ds);
+      while (proc_read (ds, &c))
+        if (aggregate_single_case (&agr, c, &agr.agr_case)) 
+          if (!agr.sink->class->write (agr.sink, &agr.agr_case)) 
+            {
+              proc_close (ds);
+              goto error; 
+            }
+      if (!proc_close (ds))
         goto error;
       if (agr.case_cnt > 0) 
         {
@@ -300,8 +303,7 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
       dict_destroy (dict);
       dataset_set_dict (ds, agr.dict);
       agr.dict = NULL;
-      proc_set_source (ds, 
-                      agr.sink->class->make_source (agr.sink));
+      proc_set_source (ds, agr.sink->class->make_source (agr.sink));
       free_case_sink (agr.sink);
     }
   else
@@ -338,7 +340,17 @@ cmd_aggregate (struct lexer *lexer, struct dataset *ds)
       else 
         {
           /* Active file is already sorted. */
-          if (!procedure (ds, presorted_agr_to_sysfile, &agr))
+          struct ccase *c;
+          
+          proc_open (ds);
+          while (proc_read (ds, &c))
+            if (aggregate_single_case (&agr, c, &agr.agr_case)) 
+              if (!any_writer_write (agr.writer, &agr.agr_case)) 
+                {
+                  proc_close (ds);
+                  goto error;
+                }
+          if (!proc_close (ds))
             goto error;
         }
       
@@ -1102,31 +1114,3 @@ initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
        }
     }
 }
-\f
-/* Aggregate each case as it comes through.  Cases which aren't needed
-   are dropped.
-   Returns true if successful, false if an I/O error occurred. */
-static bool
-agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
-{
-  struct agr_proc *agr = agr_;
-
-  if (aggregate_single_case (agr, c, &agr->agr_case)) 
-    return agr->sink->class->write (agr->sink, &agr->agr_case);
-
-  return true;
-}
-
-/* Aggregate the current case and output it if we passed a
-   breakpoint. */
-static bool
-presorted_agr_to_sysfile (const struct ccase *c, void *agr_, 
-                         const struct dataset *ds UNUSED) 
-{
-  struct agr_proc *agr = agr_;
-
-  if (aggregate_single_case (agr, c, &agr->agr_case)) 
-    return any_writer_write (agr->writer, &agr->agr_case);
-  
-  return true;
-}