Patch #5209
[pspp-builds.git] / src / math / sort.c
index 4d64b32474412bd3fcd1682ab0f572315fa5bc99..732c4b86555d30d7362bb0ac85d9199282235453 100644 (file)
    02110-1301, USA. */
 
 #include <config.h>
+
 #include "sort.h"
-#include <libpspp/message.h>
-#include <libpspp/alloc.h>
+
+#include <errno.h>
 #include <limits.h>
+#include <stdbool.h>
 #include <stdio.h>
 #include <stdlib.h>
-#include <errno.h>
-#include <libpspp/array.h>
-#include <stdbool.h>
+
+#include <data/case-source.h>
 #include <data/case.h>
 #include <data/casefile.h>
-#include <libpspp/message.h>
+#include <data/fastfile.h>
+#include <data/procedure.h>
+#include <data/settings.h>
+#include <data/variable.h>
+#include <data/storage-stream.h>
 #include <language/expressions/public.h>
-
+#include <libpspp/alloc.h>
+#include <libpspp/array.h>
+#include <libpspp/assertion.h>
+#include <libpspp/message.h>
+#include <libpspp/message.h>
 #include <libpspp/misc.h>
-#include <data/settings.h>
 #include <libpspp/str.h>
-#include <data/variable.h>
-#include <procedure.h>
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
 
-#include <libpspp/debug-print.h>
-
 /* These should only be changed for testing purposes. */
 int min_buffers = 64;
 int max_buffers = INT_MAX;
@@ -55,24 +59,11 @@ static struct casefile *do_internal_sort (struct casereader *,
 static struct casefile *do_external_sort (struct casereader *,
                                           const struct sort_criteria *);
 
-/* Gets ready to sort the active file, either in-place or to a
-   separate casefile. */
-static bool
+/* Get ready to sort the active file. */
+static void
 prepare_to_sort_active_file (void) 
 {
-  bool ok;
-  
-  /* Cancel temporary transformations and PROCESS IF. */
-  if (temporary != 0)
-    cancel_temporary (); 
-  expr_free (process_if_expr);
-  process_if_expr = NULL;
-
-  /* Make sure source cases are in a storage source. */
-  ok = procedure (NULL, NULL);
-  assert (case_source_is_class (vfm_source, &storage_source_class));
-
-  return ok;
+  proc_cancel_temporary_transformations (); 
 }
 
 /* Sorts the active file in-place according to CRITERIA.
@@ -80,21 +71,35 @@ prepare_to_sort_active_file (void)
 int
 sort_active_file_in_place (const struct sort_criteria *criteria) 
 {
-  struct casefile *src, *dst;
+  struct casefile *in, *out;
+
+  prepare_to_sort_active_file ();
+  if (!procedure (NULL, NULL))
+    return 0;
   
-  if (!prepare_to_sort_active_file ())
+  in = proc_capture_output ();
+  out = sort_execute (casefile_get_destructive_reader (in), criteria);
+  if (out == NULL) 
     return 0;
 
-  src = storage_source_get_casefile (vfm_source);
-  dst = sort_execute (casefile_get_destructive_reader (src), criteria);
-  free_case_source (vfm_source);
-  vfm_source = NULL;
+  proc_set_source (storage_source_create (out));
+  return 1;
+}
 
-  if (dst == NULL) 
-    return 0;
+/* Data passed to sort_to_casefile_callback(). */
+struct sort_to_casefile_cb_data 
+  {
+    const struct sort_criteria *criteria;
+    struct casefile *output;
+  };
 
-  vfm_source = storage_source_create (dst);
-  return 1;
+/* Sorts casefile CF according to the criteria in CB_DATA. */
+static bool
+sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
+{
+  struct sort_to_casefile_cb_data *cb_data = cb_data_;
+  cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
+  return cb_data->output != NULL;
 }
 
 /* Sorts the active file to a separate casefile.  If successful,
@@ -103,13 +108,18 @@ sort_active_file_in_place (const struct sort_criteria *criteria)
 struct casefile *
 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
 {
-  struct casefile *src;
+  struct sort_to_casefile_cb_data cb_data;
   
-  if (!prepare_to_sort_active_file ())
-    return NULL;
+  prepare_to_sort_active_file ();
 
-  src = storage_source_get_casefile (vfm_source);
-  return sort_execute (casefile_get_reader (src), criteria);
+  cb_data.criteria = criteria;
+  cb_data.output = NULL;
+  if (!multipass_procedure (sort_to_casefile_callback, &cb_data)) 
+    {
+      casefile_destroy (cb_data.output);
+      return NULL;
+    }
+  return cb_data.output;
 }
 
 
@@ -153,7 +163,7 @@ do_internal_sort (struct casereader *reader,
     return NULL;
       
   case_cnt = casefile_get_case_cnt (src);
-  dst = casefile_create (casefile_get_value_cnt (src));
+  dst = fastfile_create (casefile_get_value_cnt (src));
   if (case_cnt != 0) 
     {
       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
@@ -165,7 +175,7 @@ do_internal_sort (struct casereader *reader,
             {
               bool ok = casereader_read_xfer (reader, &cases[i].c);
               if (!ok)
-                abort ();
+                NOT_REACHED ();
               cases[i].idx = i;
             }
 
@@ -175,7 +185,7 @@ do_internal_sort (struct casereader *reader,
           for (i = 0; i < case_cnt; i++)
             casefile_append_xfer (dst, &cases[i].c);
           if (casefile_error (dst))
-            abort ();
+            NOT_REACHED ();
 
           free (cases);
         }
@@ -302,8 +312,6 @@ struct initial_run_state
     int okay;                   /* Zero if an error has been encountered. */
   };
 
-static const struct case_sink_class sort_sink_class;
-
 static bool destroy_initial_run_state (struct initial_run_state *);
 static void process_case (struct initial_run_state *, const struct ccase *,
                           size_t);
@@ -501,7 +509,7 @@ start_run (struct initial_run_state *irs)
 {
   irs->run++;
   irs->case_cnt = 0;
-  irs->casefile = casefile_create (irs->xsrt->value_cnt);
+  irs->casefile = fastfile_create (irs->xsrt->value_cnt);
   casefile_to_disk (irs->casefile);
   case_nullify (&irs->last_output); 
 }
@@ -666,7 +674,7 @@ merge_once (struct external_sort *xsrt,
     }
 
   /* Create output file. */
-  output = casefile_create (xsrt->value_cnt);
+  output = fastfile_create (xsrt->value_cnt);
   casefile_to_disk (output);
 
   /* Merge. */