Add multipass procedures. Add two-pass moments calculation. Rewrite
[pspp] / src / sort.c
index c17ca2a87e8a90616436fc07fbdb03d38f2eb1fd..5f9cdb42f50e307a68980d214eabca331b0ea917 100644 (file)
@@ -25,6 +25,7 @@
 #include <errno.h>
 #include "algorithm.h"
 #include "alloc.h"
+#include "casefile.h"
 #include "command.h"
 #include "error.h"
 #include "expr.h"
@@ -35,6 +36,7 @@
 #include "var.h"
 #include "vfm.h"
 #include "vfmP.h"
+#include "workspace.h"
 
 #if HAVE_UNISTD_H
 #include <unistd.h>
@@ -53,7 +55,8 @@
 /* Other prototypes. */
 static int compare_record (const union value *, const union value *,
                            const struct sort_cases_pgm *, int *idx_to_fv);
-static int compare_case_lists (const void *, const void *, void *);
+static int compare_cases (const struct ccase *, const struct ccase *, void *);
+static int compare_case_dblptrs (const void *, const void *, void *);
 static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
                                                int separate);
 static void destroy_internal_sort (struct internal_sort *);
@@ -163,13 +166,16 @@ destroy_sort_cases_pgm (struct sort_cases_pgm *scp)
 }
 
 /* Sorts the active file based on the key variables specified in
-   global variables vars and var_cnt.  The output is either to the
-   active file, if SEPARATE is zero, or to a separate file, if
-   SEPARATE is nonzero.  In the latter case the output cases can be
-   read with a call to read_sort_output().  (In the former case the
-   output cases should be dealt with through the usual vfm interface.)
+   global variables vars and var_cnt.
 
-   The caller is responsible for freeing vars[]. */
+   If SEPARATE is zero, then output goes to the active file.  The
+   output cases can be read through the usual VFM interfaces.
+
+   If SEPARATE is nonzero, then output goes to a separate file.
+   The output cases can be read with a call to
+   read_sort_output().
+
+   The caller is responsible for freeing SCP. */
 int
 sort_cases (struct sort_cases_pgm *scp, int separate)
 {
@@ -190,9 +196,6 @@ sort_cases (struct sort_cases_pgm *scp, int separate)
     return 1; 
 
   /* Fall back to an external sort. */
-  if (vfm_source != NULL
-      && case_source_is_class (vfm_source, &storage_source_class))
-    storage_source_to_disk (vfm_source);
   scp->xsrt = do_external_sort (scp, separate);
   if (scp->xsrt != NULL) 
     return 1;
@@ -201,10 +204,12 @@ sort_cases (struct sort_cases_pgm *scp, int separate)
   return 0;
 }
 \f
-/* Results of an internal sort. */
+/* Results of an internal sort.
+   Used only for sorting to a separate file. */
 struct internal_sort 
   {
-    struct case_list **results;
+    const struct ccase **cases;
+    size_t case_cnt;
   };
 
 /* If the data is in memory, do an internal sort.  Return
@@ -215,52 +220,53 @@ do_internal_sort (struct sort_cases_pgm *scp, int separate)
   struct internal_sort *isrt;
 
   isrt = xmalloc (sizeof *isrt);
-  isrt->results = NULL;
+  isrt->cases = NULL;
+  isrt->case_cnt = 0;
 
-  if (case_source_is_class (vfm_source, &storage_source_class)
-      && !storage_source_on_disk (vfm_source))
+  if (case_source_is_class (vfm_source, &storage_source_class)) 
     {
-      struct case_list *case_list;
-      struct case_list **case_array;
-      int case_cnt;
-      int i;
+      struct casefile *casefile = storage_source_get_casefile (vfm_source);
 
-      case_cnt = vfm_source->class->count (vfm_source);
-      if (case_cnt <= 0)
-        return isrt;
-
-      if (case_cnt > get_max_workspace() / sizeof *case_array)
-        goto error;
-
-      case_list = storage_source_get_cases (vfm_source);
-      case_array = malloc (sizeof *case_array * (case_cnt + 1));
-      if (case_array == NULL)
-        goto error;
-
-      for (i = 0; case_list != NULL; i++) 
+      if (!separate)
         {
-          case_array[i] = case_list;
-          case_list = case_list->next;
+          if (!casefile_sort (casefile, compare_cases, scp))
+            goto error;
         }
-      assert (i == case_cnt);
-      case_array[case_cnt] = NULL;
+      else 
+        {
+          /* FIXME FIXME FIXME.
+             This is crap because the casefile could get flushed
+             to disk between the time we sort it and we use it
+             later, causing invalid pointer accesses.
+             The right solution is probably to extend casefiles
+             to support duplication. */
+          struct casereader *reader;
+          size_t case_idx;
+
+          if (!casefile_in_core (casefile))
+            goto error;
+          
+          isrt->case_cnt = casefile_get_case_cnt (casefile);
+          isrt->cases = workspace_malloc (sizeof *isrt->cases
+                                          * isrt->case_cnt);
+          if (isrt->cases == NULL)
+            goto error;
 
-      sort (case_array, case_cnt, sizeof *case_array,
-            compare_case_lists, scp);
+          reader = casefile_get_reader (casefile);
+          for (case_idx = 0; case_idx < isrt->case_cnt; case_idx++) 
+            {
+              casereader_read (reader, &isrt->cases[case_idx]);
+              assert (isrt->cases[case_idx] != NULL);
+            }
+          casereader_destroy (reader);
 
-      if (!separate) 
-        {
-          storage_source_set_cases (vfm_source, case_array[0]);
-          for (i = 1; i <= case_cnt; i++)
-            case_array[i - 1]->next = case_array[i]; 
-          free (case_array);
+          sort (isrt->cases, isrt->case_cnt, casefile_get_case_size (casefile),
+                compare_case_dblptrs, scp);
         }
-      else 
-        isrt->results = case_array;
-                     
+
       return isrt;
     }
-
+  
  error:
   free (isrt);
   return NULL;
@@ -272,24 +278,34 @@ destroy_internal_sort (struct internal_sort *isrt)
 {
   if (isrt != NULL) 
     {
-      free (isrt->results);
+      workspace_free (isrt->cases, sizeof *isrt->cases * isrt->case_cnt);
       free (isrt);
     }
 }
 
-/* Compares the VAR_CNT variables in VARS[] between the
-   `case_list's at A and B, and returns a strcmp()-type
-   result. */
+/* Compares the variables specified by SCP between the cases at A
+   and B, and returns a strcmp()-type result. */
 static int
-compare_case_lists (const void *a_, const void *b_, void *scp_)
+compare_cases (const struct ccase *a, const struct ccase *b,
+               void *scp_)
 {
   struct sort_cases_pgm *scp = scp_;
-  struct case_list *const *pa = a_;
-  struct case_list *const *pb = b_;
-  struct case_list *a = *pa;
-  struct case_list *b = *pb;
 
-  return compare_record (a->c.data, b->c.data, scp, NULL);
+  return compare_record (a->data, b->data, scp, NULL);
+}
+
+/* Compares the variables specified by SCP between the cases at A
+   and B, and returns a strcmp()-type result. */
+static int
+compare_case_dblptrs (const void *a_, const void *b_, void *scp_)
+{
+  struct sort_cases_pgm *scp = scp_;
+  struct ccase *const *pa = a_;
+  struct ccase *const *pb = b_;
+  struct ccase *a = *pa;
+  struct ccase *b = *pb;
+
+  return compare_record (a->data, b->data, scp, NULL);
 }
 \f
 /* External sort. */
@@ -356,6 +372,10 @@ do_external_sort (struct sort_cases_pgm *scp, int separate)
   struct external_sort *xsrt;
   int success = 0;
 
+  if (vfm_source != NULL
+      && case_source_is_class (vfm_source, &storage_source_class)) 
+    casefile_to_disk (storage_source_get_casefile (vfm_source));
+
   xsrt = xmalloc (sizeof *xsrt);
   xsrt->scp = scp;
   if (!init_external_sort (xsrt))
@@ -1344,12 +1364,11 @@ read_internal_sort_output (struct internal_sort *isrt,
                            read_sort_output_func *output_func,
                            void *aux)
 {
-  struct case_list **p;
+  size_t case_idx;
 
-  for (p = isrt->results; *p; p++)
-    if (!output_func (&(*p)->c, aux))
+  for (case_idx = 0; case_idx < isrt->case_cnt; case_idx++)
+    if (!output_func (isrt->cases[case_idx], aux))
       break;
-  free (isrt->results);
 }
 
 static void