Added abstract factory to create casefiles. Updated procedures to use
[pspp-builds.git] / src / math / sort.c
index 20d8dde73cb81a34fca282eab444d10760a300ec..d3e4446d075bc8be8a92cd003c62649e83b021f6 100644 (file)
@@ -1,6 +1,5 @@
 /* PSPP - computes sample statistics.
-   Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
-   Written by Ben Pfaff <blp@gnu.org>.
+   Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
    02110-1301, USA. */
 
 #include <config.h>
+
 #include "sort.h"
-#include <libpspp/message.h>
-#include <libpspp/alloc.h>
+
+#include <errno.h>
 #include <limits.h>
+#include <stdbool.h>
 #include <stdio.h>
 #include <stdlib.h>
-#include <errno.h>
-#include <libpspp/array.h>
-#include <stdbool.h>
+
+#include <data/case-source.h>
 #include <data/case.h>
 #include <data/casefile.h>
+#include <data/fastfile.h>
+#include <data/casefile-factory.h>
+#include <data/fastfile-factory.h>
+#include <data/procedure.h>
+#include <data/settings.h>
+#include <data/variable.h>
+#include <data/storage-stream.h>
+#include <libpspp/alloc.h>
+#include <libpspp/array.h>
+#include <libpspp/assertion.h>
+#include <libpspp/message.h>
 #include <libpspp/message.h>
-
 #include <libpspp/misc.h>
-#include <data/settings.h>
 #include <libpspp/str.h>
-#include <data/variable.h>
-#include <procedure.h>
+
+#include "minmax.h"
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
 
-#include <libpspp/debug-print.h>
-
 /* These should only be changed for testing purposes. */
 int min_buffers = 64;
 int max_buffers = INT_MAX;
@@ -50,78 +57,103 @@ bool allow_internal_sort = true;
 static int compare_record (const struct ccase *, const struct ccase *,
                            const struct sort_criteria *);
 static struct casefile *do_internal_sort (struct casereader *,
-                                          const struct sort_criteria *);
+                                          const struct sort_criteria *,
+                                         struct casefile_factory *
+                                         );
 static struct casefile *do_external_sort (struct casereader *,
-                                          const struct sort_criteria *);
-
-/* Gets ready to sort the active file, either in-place or to a
-   separate casefile. */
-static bool
-prepare_to_sort_active_file (void) 
-{
-  bool ok;
-  
-  /* Cancel temporary transformations and PROCESS IF. */
-  if (temporary != 0)
-    cancel_temporary (); 
-  expr_free (process_if_expr);
-  process_if_expr = NULL;
-
-  /* Make sure source cases are in a storage source. */
-  ok = procedure (NULL, NULL);
-  assert (case_source_is_class (vfm_source, &storage_source_class));
+                                          const struct sort_criteria *,
+                                         struct casefile_factory *
+                                         );
 
-  return ok;
-}
 
 /* Sorts the active file in-place according to CRITERIA.
-   Returns nonzero if successful. */
-int
-sort_active_file_in_place (const struct sort_criteria *criteria) 
+   Returns true if successful. */
+bool
+sort_active_file_in_place (struct dataset *ds, 
+                          const struct sort_criteria *criteria) 
 {
-  struct casefile *src, *dst;
-  
-  if (!prepare_to_sort_active_file ())
-    return 0;
+  struct casefile *in, *out;
 
-  src = storage_source_get_casefile (vfm_source);
-  dst = sort_execute (casefile_get_destructive_reader (src), criteria);
-  free_case_source (vfm_source);
-  vfm_source = NULL;
+  proc_cancel_temporary_transformations (ds);
+  if (!procedure (ds, NULL, NULL))
+    return false;
+  
+  in = proc_capture_output (ds);
+  out = sort_execute (casefile_get_destructive_reader (in), criteria, 
+                     dataset_get_casefile_factory (ds));
+  if (out == NULL) 
+    return false;
+
+  proc_set_source (ds, storage_source_create (out));
+  return true;
+}
 
-  if (dst == NULL) 
-    return 0;
+/* Data passed to sort_to_casefile_callback(). */
+struct sort_to_casefile_cb_data 
+  {
+    const struct sort_criteria *criteria;
+    struct casefile *output;
+    struct casefile_factory *factory ;
+  };
 
-  vfm_source = storage_source_create (dst);
-  return 1;
+/* Sorts casefile CF according to the criteria in CB_DATA. */
+static bool
+sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
+{
+  struct sort_to_casefile_cb_data *cb_data = cb_data_;
+  cb_data->output = sort_execute (casefile_get_reader (cf, NULL), 
+                                 cb_data->criteria,
+                                 cb_data->factory
+                                 );
+  return cb_data->output != NULL;
 }
 
 /* Sorts the active file to a separate casefile.  If successful,
    returns the sorted casefile.  Returns a null pointer on
    failure. */
 struct casefile *
-sort_active_file_to_casefile (const struct sort_criteria *criteria) 
+sort_active_file_to_casefile (struct dataset *ds, 
+                             const struct sort_criteria *criteria) 
 {
-  struct casefile *src;
+  struct sort_to_casefile_cb_data cb_data;
   
-  if (!prepare_to_sort_active_file ())
-    return NULL;
+  proc_cancel_temporary_transformations (ds);
 
-  src = storage_source_get_casefile (vfm_source);
-  return sort_execute (casefile_get_reader (src), criteria);
+  cb_data.criteria = criteria;
+  cb_data.output = NULL;
+  cb_data.factory = dataset_get_casefile_factory (ds);
+  if (!multipass_procedure (ds, sort_to_casefile_callback, &cb_data)) 
+    {
+      casefile_destroy (cb_data.output);
+      return NULL;
+    }
+  return cb_data.output;
 }
 
 
 /* Reads all the cases from READER, which is destroyed.  Sorts
    the cases according to CRITERIA.  Returns the sorted cases in
-   a newly created casefile. */
+   a newly created casefile, which will be created by FACTORY.
+   If FACTORY is NULL, then a local fastfile_factory will be used.
+*/
 struct casefile *
-sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
+sort_execute (struct casereader *reader,
+             const struct sort_criteria *criteria,
+             struct casefile_factory *factory
+             )
 {
-  struct casefile *output = do_internal_sort (reader, criteria);
+  struct casefile_factory *local_factory = NULL;
+  struct casefile *output ;
+  if ( factory == NULL )
+    factory = local_factory = fastfile_factory_create ();
+
+  output = do_internal_sort (reader, criteria, factory);
   if (output == NULL)
-    output = do_external_sort (reader, criteria);
+    output = do_external_sort (reader, criteria, factory);
   casereader_destroy (reader);
+
+  fastfile_factory_destroy (local_factory);
+
   return output;
 }
 \f
@@ -132,13 +164,14 @@ struct indexed_case
     unsigned long idx;  /* Index to allow for stable sorting. */
   };
 
-static int compare_indexed_cases (const void *, const void *, void *);
+static int compare_indexed_cases (const void *, const void *, const void *);
 
 /* If the data is in memory, do an internal sort and return a new
    casefile for the data.  Otherwise, return a null pointer. */
 static struct casefile *
 do_internal_sort (struct casereader *reader,
-                  const struct sort_criteria *criteria)
+                  const struct sort_criteria *criteria, 
+                 struct casefile_factory *factory)
 {
   const struct casefile *src;
   struct casefile *dst;
@@ -152,7 +185,7 @@ do_internal_sort (struct casereader *reader,
     return NULL;
       
   case_cnt = casefile_get_case_cnt (src);
-  dst = casefile_create (casefile_get_value_cnt (src));
+  dst = factory->create_casefile (factory, casefile_get_value_cnt (src));
   if (case_cnt != 0) 
     {
       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
@@ -164,7 +197,7 @@ do_internal_sort (struct casereader *reader,
             {
               bool ok = casereader_read_xfer (reader, &cases[i].c);
               if (!ok)
-                abort ();
+                NOT_REACHED ();
               cases[i].idx = i;
             }
 
@@ -174,7 +207,7 @@ do_internal_sort (struct casereader *reader,
           for (i = 0; i < case_cnt; i++)
             casefile_append_xfer (dst, &cases[i].c);
           if (casefile_error (dst))
-            abort ();
+            NOT_REACHED ();
 
           free (cases);
         }
@@ -193,9 +226,9 @@ do_internal_sort (struct casereader *reader,
    at A and B, with a "last resort" comparison for stability, and
    returns a strcmp()-type result. */
 static int
-compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
+compare_indexed_cases (const void *a_, const void *b_, const void *criteria_)
 {
-  struct sort_criteria *criteria = criteria_;
+  const struct sort_criteria *criteria = criteria_;
   const struct indexed_case *a = a_;
   const struct indexed_case *b = b_;
   int result = compare_record (&a->c, &b->c, criteria);
@@ -218,6 +251,7 @@ struct external_sort
     size_t value_cnt;                 /* Size of data in `union value's. */
     struct casefile **runs;           /* Array of initial runs. */
     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
+    struct casefile_factory *factory; /* Factory used to  create the result */
   };
 
 /* Prototypes for helper functions. */
@@ -231,7 +265,9 @@ static void destroy_external_sort (struct external_sort *);
    pattern that assures stability. */
 static struct casefile *
 do_external_sort (struct casereader *reader,
-                  const struct sort_criteria *criteria)
+                  const struct sort_criteria *criteria,
+                 struct casefile_factory *factory
+                 )
 {
   struct external_sort *xsrt;
 
@@ -244,6 +280,7 @@ do_external_sort (struct casereader *reader,
   xsrt->run_cap = 512;
   xsrt->run_cnt = 0;
   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
+  xsrt->factory = factory;
   if (write_runs (xsrt, reader))
     {
       struct casefile *output = merge (xsrt);
@@ -301,19 +338,18 @@ struct initial_run_state
     int okay;                   /* Zero if an error has been encountered. */
   };
 
-static const struct case_sink_class sort_sink_class;
-
 static bool destroy_initial_run_state (struct initial_run_state *);
-static void process_case (struct initial_run_state *, const struct ccase *,
-                          size_t);
+static void process_case (struct initial_run_state *, 
+                         const struct ccase *, size_t);
 static int allocate_cases (struct initial_run_state *);
 static void output_record (struct initial_run_state *);
 static void start_run (struct initial_run_state *);
 static void end_run (struct initial_run_state *);
 static int compare_record_run (const struct record_run *,
                                const struct record_run *,
-                               struct initial_run_state *);
-static int compare_record_run_minheap (const void *, const void *, void *);
+                               const struct initial_run_state *);
+static int compare_record_run_minheap (const void *, const void *, 
+                                      const void *);
 
 /* Reads cases from READER and composes initial runs in XSRT. */
 static int
@@ -356,7 +392,8 @@ write_runs (struct external_sort *xsrt, struct casereader *reader)
 
 /* Add a single case to an initial run. */
 static void
-process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
+process_case (struct initial_run_state *irs, const struct ccase *c, 
+             size_t idx)
 {
   struct record_run *rr;
 
@@ -455,13 +492,14 @@ compare_record (const struct ccase *a, const struct ccase *b,
       
       if (c->width == 0)
         {
-          double af = case_num (a, c->fv);
-          double bf = case_num (b, c->fv);
+          double af = case_num_idx (a, c->fv);
+          double bf = case_num_idx (b, c->fv);
           
           result = af < bf ? -1 : af > bf;
         }
       else
-        result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
+        result = memcmp (case_str_idx (a, c->fv),
+                         case_str_idx (b, c->fv), c->width);
 
       if (result != 0)
         return c->dir == SRT_ASCEND ? result : -result;
@@ -475,7 +513,7 @@ compare_record (const struct ccase *a, const struct ccase *b,
 static int
 compare_record_run (const struct record_run *a,
                     const struct record_run *b,
-                    struct initial_run_state *irs)
+                    const struct initial_run_state *irs)
 {
   int result = a->run < b->run ? -1 : a->run > b->run;
   if (result == 0)
@@ -489,7 +527,7 @@ compare_record_run (const struct record_run *a,
    on the current record according to SCP, but in descending
    order. */
 static int
-compare_record_run_minheap (const void *a, const void *b, void *irs) 
+compare_record_run_minheap (const void *a, const void *b, const void *irs) 
 {
   return -compare_record_run (a, b, irs);
 }
@@ -500,7 +538,10 @@ start_run (struct initial_run_state *irs)
 {
   irs->run++;
   irs->case_cnt = 0;
-  irs->casefile = casefile_create (irs->xsrt->value_cnt);
+
+  /* This casefile is internal to the sort, so don't use the factory
+     to create it. */
+  irs->casefile = fastfile_create (irs->xsrt->value_cnt);
   casefile_to_disk (irs->casefile);
   case_nullify (&irs->last_output); 
 }
@@ -580,7 +621,7 @@ merge (struct external_sort *xsrt)
 {
   while (xsrt->run_cnt > 1)
     {
-      int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
+      int order = MIN (MAX_MERGE_ORDER, xsrt->run_cnt);
       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
@@ -665,7 +706,7 @@ merge_once (struct external_sort *xsrt,
     }
 
   /* Create output file. */
-  output = casefile_create (xsrt->value_cnt);
+  output = xsrt->factory->create_casefile (xsrt->factory, xsrt->value_cnt);
   casefile_to_disk (output);
 
   /* Merge. */