Redo VFM interface. Get rid of compaction_necessary, compaction_nval,
[pspp-builds.git] / src / sort.c
index b00c492fe6c88f59b55d2d98435bdd071213b9c0..5fd2eaec9e991129229f22e5a9deac913a0521a9 100644 (file)
@@ -52,7 +52,7 @@
 
 /* Other prototypes. */
 static int compare_record (const union value *, const union value *,
-                           const struct sort_cases_pgm *);
+                           const struct sort_cases_pgm *, int *idx_to_fv);
 static int compare_case_lists (const void *, const void *, void *);
 static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
                                                int separate);
@@ -76,8 +76,13 @@ cmd_sort_cases (void)
   scp = parse_sort ();
   if (scp == NULL)
     return CMD_FAILURE;
-      
-  cancel_temporary ();
+
+  if (temporary != 0)
+    {
+      msg (SE, _("SORT CASES may not be used after TEMPORARY.  "
+                 "Temporary transformations will be made permanent."));
+      cancel_temporary (); 
+    }
 
   success = sort_cases (scp, 0);
   destroy_sort_cases_pgm (scp);
@@ -170,11 +175,12 @@ destroy_sort_cases_pgm (struct sort_cases_pgm *scp)
 int
 sort_cases (struct sort_cases_pgm *scp, int separate)
 {
-  scp->case_size = sizeof (union value) * compaction_nval;
+  scp->case_size
+    = sizeof (union value) * dict_get_compacted_value_cnt (default_dict);
 
   /* Not sure this is necessary but it's good to be safe. */
   if (separate && case_source_is_class (vfm_source, &sort_source_class))
-    procedure (NULL, NULL, NULL, NULL);
+    procedure (NULL, NULL);
   
   /* SORT CASES cancels PROCESS IF. */
   expr_free (process_if_expr);
@@ -186,7 +192,9 @@ sort_cases (struct sort_cases_pgm *scp, int separate)
     return 1; 
 
   /* Fall back to an external sort. */
-  write_active_file_to_disk ();
+  if (vfm_source != NULL
+      && case_source_is_class (vfm_source, &storage_source_class))
+    storage_source_to_disk (vfm_source);
   scp->xsrt = do_external_sort (scp, separate);
   if (scp->xsrt != NULL) 
     return 1;
@@ -201,8 +209,8 @@ struct internal_sort
     struct case_list **results;
   };
 
-/* If a reasonable situation is set up, do an internal sort of the
-   data.  Return success. */
+/* If the data is in memory, do an internal sort.  Return
+   success. */
 static struct internal_sort *
 do_internal_sort (struct sort_cases_pgm *scp, int separate)
 {
@@ -211,53 +219,48 @@ do_internal_sort (struct sort_cases_pgm *scp, int separate)
   isrt = xmalloc (sizeof *isrt);
   isrt->results = NULL;
 
-  if (!case_source_is_class (vfm_source, &disk_source_class))
+  if (case_source_is_class (vfm_source, &storage_source_class)
+      && !storage_source_on_disk (vfm_source))
     {
-      if (!case_source_is_class (vfm_source, &memory_source_class))
-       procedure (NULL, NULL, NULL, NULL);
-
-      if (case_source_is_class (vfm_source, &memory_source_class))
-       {
-          struct case_list *case_list;
-          struct case_list **case_array;
-          int case_cnt;
-          int i;
+      struct case_list *case_list;
+      struct case_list **case_array;
+      int case_cnt;
+      int i;
 
-          case_cnt = vfm_source->class->count (vfm_source);
-         if (case_cnt <= 0)
-            return isrt;
+      case_cnt = vfm_source->class->count (vfm_source);
+      if (case_cnt <= 0)
+        return isrt;
 
-          if (case_cnt > set_max_workspace / sizeof *case_array)
-            goto error;
+      if (case_cnt > set_max_workspace / sizeof *case_array)
+        goto error;
 
-          case_list = memory_source_get_cases (vfm_source);
-          case_array = malloc (sizeof *case_array * (case_cnt + 1));
-          if (case_array == NULL)
-            goto error;
+      case_list = storage_source_get_cases (vfm_source);
+      case_array = malloc (sizeof *case_array * (case_cnt + 1));
+      if (case_array == NULL)
+        goto error;
 
-          for (i = 0; case_list != NULL; i++) 
-            {
-              case_array[i] = case_list;
-              case_list = case_list->next;
-            }
-          assert (i == case_cnt);
-          case_array[case_cnt] = NULL;
+      for (i = 0; case_list != NULL; i++) 
+        {
+          case_array[i] = case_list;
+          case_list = case_list->next;
+        }
+      assert (i == case_cnt);
+      case_array[case_cnt] = NULL;
 
-          sort (case_array, case_cnt, sizeof *case_array,
-                compare_case_lists, scp);
+      sort (case_array, case_cnt, sizeof *case_array,
+            compare_case_lists, scp);
 
-          if (!separate) 
-            {
-              memory_source_set_cases (vfm_source, case_array[0]);
-              for (i = 1; i <= case_cnt; i++)
-                case_array[i - 1]->next = case_array[i]; 
-              free (case_array);
-            }
-          else 
-            isrt->results = case_array;
+      if (!separate) 
+        {
+          storage_source_set_cases (vfm_source, case_array[0]);
+          for (i = 1; i <= case_cnt; i++)
+            case_array[i - 1]->next = case_array[i]; 
+          free (case_array);
+        }
+      else 
+        isrt->results = case_array;
                      
-          return isrt;
-       }
+      return isrt;
     }
 
  error:
@@ -288,7 +291,7 @@ compare_case_lists (const void *a_, const void *b_, void *scp_)
   struct case_list *a = *pa;
   struct case_list *b = *pb;
 
-  return compare_record (a->c.data, b->c.data, scp);
+  return compare_record (a->c.data, b->c.data, scp, NULL);
 }
 \f
 /* External sort. */
@@ -625,6 +628,8 @@ struct initial_run_state
   {
     struct external_sort *xsrt;
 
+    int *idx_to_fv;             /* Translation table copied from sink. */
+
     /* Reservoir. */
     struct record_run *records; /* Records arranged as a heap. */
     size_t record_cnt;          /* Current number of records. */
@@ -634,13 +639,14 @@ struct initial_run_state
     /* Run currently being output. */
     int file_idx;               /* Temporary file number. */
     size_t case_cnt;            /* Number of cases so far. */
-    size_t case_size;           /* Number of bytes in a case. */
     FILE *output_file;          /* Output file. */
     struct case_list *last_output;/* Record last output. */
 
     int okay;                   /* Zero if an error has been encountered. */
   };
 
+static const struct case_sink_class sort_sink_class;
+
 static void destroy_initial_run_state (struct initial_run_state *irs);
 static int allocate_cases (struct initial_run_state *);
 static struct case_list *grab_case (struct initial_run_state *);
@@ -650,7 +656,7 @@ static void start_run (struct initial_run_state *irs);
 static void end_run (struct initial_run_state *irs);
 static int compare_record_run (const struct record_run *,
                                const struct record_run *,
-                               struct sort_cases_pgm *);
+                               struct initial_run_state *);
 static int compare_record_run_minheap (const void *, const void *, void *);
 
 /* Writes initial runs for XSRT, sending them to a separate file
@@ -671,7 +677,6 @@ write_initial_runs (struct external_sort *xsrt, int separate)
   irs->last_output = NULL;
   irs->file_idx = 0;
   irs->case_cnt = 0;
-  irs->case_size = dict_get_case_size (default_dict);
   irs->okay = 1;
   if (!allocate_cases (irs)) 
     goto done;
@@ -681,13 +686,14 @@ write_initial_runs (struct external_sort *xsrt, int separate)
     {
       if (vfm_sink != NULL && vfm_sink->class->destroy != NULL)
        vfm_sink->class->destroy (vfm_sink);
-      vfm_sink = create_case_sink (&sort_sink_class, irs);
+      vfm_sink = create_case_sink (&sort_sink_class, default_dict, irs);
       xsrt->scp->ref_cnt++;
     }
 
   /* Create initial runs. */
   start_run (irs);
-  procedure (NULL, NULL, NULL, NULL);
+  procedure (NULL, NULL);
+  irs->idx_to_fv = NULL;
   while (irs->record_cnt > 0 && irs->okay)
     output_record (irs);
   end_run (irs);
@@ -710,18 +716,20 @@ sort_sink_write (struct case_sink *sink, const struct ccase *c)
   if (!irs->okay)
     return;
 
+  irs->idx_to_fv = sink->idx_to_fv;
+
   /* Compose record_run for this run and add to heap. */
   assert (irs->record_cnt < irs->record_cap);
   new_record_run = irs->records + irs->record_cnt++;
   new_record_run->record = grab_case (irs);
-  memcpy (new_record_run->record->c.data, c->data, irs->case_size);
+  memcpy (new_record_run->record->c.data, c->data, irs->xsrt->scp->case_size);
   new_record_run->run = irs->file_idx;
   if (irs->last_output != NULL
       && compare_record (c->data, irs->last_output->c.data,
-                         irs->xsrt->scp) < 0)
+                         irs->xsrt->scp, sink->idx_to_fv) < 0)
     new_record_run->run = irs->xsrt->next_file_idx;
   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
-             compare_record_run_minheap, irs->xsrt->scp);
+             compare_record_run_minheap, irs);
 
   /* Output a record if the reservoir is full. */
   if (irs->record_cnt == irs->record_cap && irs->okay)
@@ -752,34 +760,34 @@ destroy_initial_run_state (struct initial_run_state *irs)
     }
 
   free (irs->records);
-  free (irs);
-
   if (irs->output_file != NULL)
     close_temp_file (irs->xsrt, irs->file_idx, irs->output_file);
+
+  free (irs);
 }
 
 /* Allocates room for lots of cases as a buffer. */
 static int
 allocate_cases (struct initial_run_state *irs)
 {
-  size_t case_size;     /* Size of one case, in bytes. */
   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
   int max_cases;        /* Maximum number of cases to allocate. */
   int i;
 
   /* Allocate as many cases as we can within the workspace
      limit. */
-  case_size = dict_get_case_size (default_dict);
   approx_case_cost = (sizeof *irs->records
                       + sizeof *irs->free_list
-                      + case_size
+                      + irs->xsrt->scp->case_size
                       + 4 * sizeof (void *));
   max_cases = set_max_workspace / approx_case_cost;
   irs->records = malloc (sizeof *irs->records * max_cases);
   for (i = 0; i < max_cases; i++)
     {
       struct case_list *c;
-      c = malloc (sizeof *c + case_size - sizeof (union value));
+      c = malloc (sizeof *c
+                  + irs->xsrt->scp->case_size
+                  - sizeof (union value));
       if (c == NULL) 
         {
           max_cases = i;
@@ -808,7 +816,8 @@ allocate_cases (struct initial_run_state *irs)
    A and B, and returns a strcmp()-type result. */
 static int
 compare_record (const union value *a, const union value *b,
-                const struct sort_cases_pgm *scp)
+                const struct sort_cases_pgm *scp,
+                int *idx_to_fv)
 {
   int i;
 
@@ -818,9 +827,14 @@ compare_record (const union value *a, const union value *b,
   for (i = 0; i < scp->var_cnt; i++)
     {
       struct variable *v = scp->vars[i];
-      int fv = v->fv;
+      int fv;
       int result;
 
+      if (idx_to_fv != NULL)
+        fv = idx_to_fv[v->index];
+      else
+        fv = v->fv;
+      
       if (v->type == NUMERIC)
         {
           double af = a[fv].f;
@@ -847,21 +861,22 @@ compare_record (const union value *a, const union value *b,
 static int
 compare_record_run (const struct record_run *a,
                     const struct record_run *b,
-                    struct sort_cases_pgm *scp) 
+                    struct initial_run_state *irs)
 {
   if (a->run != b->run)
     return a->run > b->run ? 1 : -1;
   else
-    return compare_record (a->record->c.data, b->record->c.data, scp);
+    return compare_record (a->record->c.data, b->record->c.data,
+                           irs->xsrt->scp, irs->idx_to_fv);
 }
 
 /* Compares record-run tuples A and B on run number first, then
    on the current record according to SCP, but in descending
    order. */
 static int
-compare_record_run_minheap (const void *a, const void *b, void *scp
+compare_record_run_minheap (const void *a, const void *b, void *irs
 {
-  return -compare_record_run (a, b, scp);
+  return -compare_record_run (a, b, irs);
 }
 
 /* Begins a new initial run, specifically its output file. */
@@ -910,26 +925,17 @@ static void
 output_record (struct initial_run_state *irs)
 {
   struct record_run *record_run;
-  struct ccase *out_case;
   
   /* Extract minimum case from heap. */
   assert (irs->record_cnt > 0);
   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
-            compare_record_run_minheap, irs->xsrt->scp);
+            compare_record_run_minheap, irs);
   record_run = irs->records + irs->record_cnt;
 
   /* Bail if an error has occurred. */
   if (!irs->okay)
     return;
 
-  /* Obtain case data to write to disk. */
-  out_case = &record_run->record->c;
-  if (compaction_necessary)
-    {
-      compact_case (compaction_case, out_case);
-      out_case = compaction_case;
-    }
-
   /* Start new run if necessary. */
   assert (record_run->run == irs->file_idx
           || record_run->run == irs->xsrt->next_file_idx);
@@ -944,8 +950,7 @@ output_record (struct initial_run_state *irs)
   /* Write to disk. */
   if (irs->output_file != NULL
       && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file,
-                           out_case->data,
-                           sizeof *out_case->data * compaction_nval))
+                           &record_run->record->c, irs->xsrt->scp->case_size))
     irs->okay = 0;
 
   /* This record becomes last_output. */
@@ -1003,7 +1008,6 @@ static int
 merge (struct external_sort *xsrt)
 {
   struct merge_state mrg;       /* State of merge. */
-  size_t case_size;             /* Size of one case, in bytes. */
   size_t approx_case_cost;      /* Approximate memory cost of one case. */
   int max_order;                /* Maximum order of merge. */
   size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
@@ -1013,15 +1017,15 @@ merge (struct external_sort *xsrt)
   mrg.xsrt = xsrt;
 
   /* Allocate as many cases as possible into cases. */
-  case_size = dict_get_case_size (default_dict);
-  approx_case_cost = sizeof *mrg.cases + case_size + 4 * sizeof (void *);
+  approx_case_cost = (sizeof *mrg.cases
+                      + xsrt->scp->case_size + 4 * sizeof (void *));
   mrg.case_cnt = set_max_workspace / approx_case_cost;
   mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
   if (mrg.cases == NULL)
     goto done;
   for (i = 0; i < mrg.case_cnt; i++) 
     {
-      mrg.cases[i] = malloc (case_size);
+      mrg.cases[i] = malloc (xsrt->scp->case_size);
       if (mrg.cases[i] == NULL) 
         {
           mrg.case_cnt = i;
@@ -1041,8 +1045,9 @@ merge (struct external_sort *xsrt)
   max_order = MAX_MERGE_ORDER;
   if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
     max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
-  else if (mrg.case_cnt / max_order * case_size < MIN_BUFFER_SIZE_BYTES)
-    max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / case_size);
+  else if (mrg.case_cnt / max_order * xsrt->scp->case_size
+           < MIN_BUFFER_SIZE_BYTES)
+    max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / xsrt->scp->case_size);
   if (max_order < 2)
     max_order = 2;
   if (max_order > xsrt->run_cnt)
@@ -1138,7 +1143,6 @@ merge_once (struct merge_state *mrg,
 {
   struct run runs[MAX_MERGE_ORDER];
   FILE *output_file = NULL;
-  size_t case_size;
   int success = 0;
   int i;
 
@@ -1177,7 +1181,6 @@ merge_once (struct merge_state *mrg,
       goto error;
 
   /* Merge. */
-  case_size = dict_get_case_size (default_dict);
   while (run_cnt > 0) 
     {
       struct run *min_run;
@@ -1187,12 +1190,13 @@ merge_once (struct merge_state *mrg,
       for (i = 1; i < run_cnt; i++)
        if (compare_record ((*runs[i].buffer_head)->data,
                             (*min_run->buffer_head)->data,
-                            mrg->xsrt->scp) < 0)
+                            mrg->xsrt->scp, NULL) < 0)
           min_run = runs + i;
 
       /* Write minimum to output file. */
       if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file,
-                            (*min_run->buffer_head)->data, case_size))
+                            (*min_run->buffer_head)->data,
+                            mrg->xsrt->scp->case_size))
         goto error;
 
       /* Remove case from buffer. */
@@ -1249,7 +1253,7 @@ fill_run_buffer (struct merge_state *mrg, struct run *run)
     {
       if (!read_temp_file (mrg->xsrt, run->file_idx, run->file,
                            (*run->buffer_tail)->data,
-                           dict_get_case_size (default_dict)))
+                           mrg->xsrt->scp->case_size))
         return 0;
 
       run->unread_case_cnt--;
@@ -1268,7 +1272,7 @@ sort_sink_make_source (struct case_sink *sink)
                              irs->xsrt->scp);
 }
 
-const struct case_sink_class sort_sink_class = 
+static const struct case_sink_class sort_sink_class = 
   {
     "SORT CASES",
     NULL,