Got rid of "struct long_vec", envector(), devector(), etc.
[pspp] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "approx.h"
32 #include "do-ifP.h"
33 #include "error.h"
34 #include "expr.h"
35 #include "misc.h"
36 #include "random.h"
37 #include "som.h"
38 #include "str.h"
39 #include "tab.h"
40 #include "var.h"
41 #include "value-labels.h"
42
43 /*
44    Virtual File Manager (vfm):
45
46    vfm is used to process data files.  It uses the model that data is
47    read from one stream (the data source), then written to another
48    (the data sink).  The data source is then deleted and the data sink
49    becomes the data source for the next procedure. */
50
51 #include "debug-print.h"
52
53 /* Procedure execution data. */
54 struct write_case_data
55   {
56     void (*beginfunc) (void *);
57     int (*procfunc) (struct ccase *, void *);
58     void (*endfunc) (void *);
59     void *aux;
60   };
61
62 /* This is used to read from the active file. */
63 struct case_stream *vfm_source;
64
65 /* This is used to write to the replacement active file. */
66 struct case_stream *vfm_sink;
67
68 /* Information about the data source. */
69 struct stream_info vfm_source_info;
70
71 /* Information about the data sink. */
72 struct stream_info vfm_sink_info;
73
74 /* Filter variable and  `value' index. */
75 static struct variable *filter_var;
76 static int filter_index;
77
78 #define FILTERED                                                        \
79         (filter_index != -1                                             \
80          && (temp_case->data[filter_index].f == 0.0                     \
81              || temp_case->data[filter_index].f == SYSMIS               \
82              || is_num_user_missing (temp_case->data[filter_index].f,   \
83                                      filter_var)))
84
85 /* Nonzero if the case needs to have values deleted before being
86    stored, zero otherwise. */
87 int compaction_necessary;
88
89 /* Number of values after compaction, or the same as
90    vfm_sink_info.nval, if compaction is not necessary. */
91 int compaction_nval;
92
93 /* Temporary case buffer with enough room for `compaction_nval'
94    `value's. */
95 struct ccase *compaction_case;
96
97 /* Within a session, when paging is turned on, it is never turned back
98    off.  This policy might be too aggressive. */
99 static int paging = 0;
100
101 /* Time at which vfm was last invoked. */
102 time_t last_vfm_invocation;
103
104 /* Number of cases passed to proc_func(). */
105 static int case_count;
106
107 /* Lag queue. */
108 int n_lag;                      /* Number of cases to lag. */
109 static int lag_count;           /* Number of cases in lag_queue so far. */
110 static int lag_head;            /* Index where next case will be added. */
111 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
112
113 static void open_active_file (void);
114 static void close_active_file (struct write_case_data *);
115 static int SPLIT_FILE_procfunc (struct ccase *, void *);
116 static void finish_compaction (void);
117 static void lag_case (void);
118 static int procedure_write_case (struct write_case_data *);
119 static void clear_temp_case (void);
120 \f
121 /* Public functions. */
122
123 /* Reads all the cases from the active file, transforms them by
124    the active set of transformations, calls PROCFUNC with CURCASE
125    set to the case , and writes them to a new active file.
126
127    Divides the active file into zero or more series of one or more
128    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
129    called after each series.
130
131    Arbitrary user-specified data AUX is passed to BEGINFUNC,
132    PROCFUNC, and ENDFUNC as auxiliary data. */
133 void
134 procedure (void (*beginfunc) (void *),
135            int (*procfunc) (struct ccase *curcase, void *),
136            void (*endfunc) (void *),
137            void *aux)
138 {
139   struct write_case_data procedure_write_data;
140   struct write_case_data split_file_data;
141
142   if (dict_get_split_cnt (default_dict) == 0) 
143     {
144       /* Normally we just use the data passed by the user. */
145       procedure_write_data.beginfunc = beginfunc;
146       procedure_write_data.procfunc = procfunc;
147       procedure_write_data.endfunc = endfunc;
148       procedure_write_data.aux = aux;
149     }
150   else
151     {
152       /* Under SPLIT FILE, we add a layer of indirection. */
153       procedure_write_data.beginfunc = NULL;
154       procedure_write_data.procfunc = SPLIT_FILE_procfunc;
155       procedure_write_data.endfunc = endfunc;
156       procedure_write_data.aux = &split_file_data;
157
158       split_file_data.beginfunc = beginfunc;
159       split_file_data.procfunc = procfunc;
160       split_file_data.endfunc = endfunc;
161       split_file_data.aux = aux;
162     }
163
164   last_vfm_invocation = time (NULL);
165
166   open_active_file ();
167   vfm_source->read (procedure_write_case, &procedure_write_data);
168   close_active_file (&procedure_write_data);
169 }
170 \f
171 /* Active file processing support.  Subtly different semantics from
172    procedure(). */
173
174 static int process_active_file_write_case (struct write_case_data *data);
175
176 /* The casefunc might want us to stop calling it. */
177 static int not_canceled;
178
179 /* Reads all the cases from the active file and passes them one-by-one
180    to CASEFUNC in temp_case.  Before any cases are passed, calls
181    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
182    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
183    file by calling process_active_file_output_case().
184
185    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
186 void
187 process_active_file (void (*beginfunc) (void *),
188                      int (*casefunc) (struct ccase *curcase, void *),
189                      void (*endfunc) (void *),
190                      void *aux)
191 {
192   struct write_case_data process_active_write_data;
193
194   process_active_write_data.beginfunc = beginfunc;
195   process_active_write_data.procfunc = casefunc;
196   process_active_write_data.endfunc = endfunc;
197   process_active_write_data.aux = aux;
198
199   not_canceled = 1;
200
201   open_active_file ();
202   beginfunc (aux);
203   
204   /* There doesn't necessarily need to be an active file. */
205   if (vfm_source)
206     vfm_source->read (process_active_file_write_case,
207                       &process_active_write_data);
208   
209   endfunc (aux);
210   close_active_file (&process_active_write_data);
211 }
212
213 /* Pass the current case to casefunc. */
214 static int
215 process_active_file_write_case (struct write_case_data *data)
216 {
217   /* Index of current transformation. */
218   int cur_trns;
219
220   for (cur_trns = f_trns ; cur_trns != temp_trns; )
221     {
222       int code;
223         
224       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
225       switch (code)
226         {
227         case -1:
228           /* Next transformation. */
229           cur_trns++;
230           break;
231         case -2:
232           /* Delete this case. */
233           goto done;
234         default:
235           /* Go to that transformation. */
236           cur_trns = code;
237           break;
238         }
239     }
240
241   if (n_lag)
242     lag_case ();
243           
244   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
245   if (not_canceled
246       && !FILTERED
247       && (process_if_expr == NULL ||
248           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
249     not_canceled = data->procfunc (temp_case, data->aux);
250   
251   case_count++;
252   
253  done:
254   clear_temp_case ();
255
256   return 1;
257 }
258
259 /* Write temp_case to the active file. */
260 void
261 process_active_file_output_case (void)
262 {
263   vfm_sink_info.ncases++;
264   vfm_sink->write ();
265 }
266 \f
267 /* Opening the active file. */
268
269 /* It might be usefully noted that the following several functions are
270    given in the order that they are called by open_active_file(). */
271
272 /* Prepare to write to the replacement active file. */
273 static void
274 prepare_for_writing (void)
275 {
276   /* FIXME: If ALL the conditions listed below hold true, then the
277      replacement active file is guaranteed to be identical to the
278      original active file:
279
280      1. TEMPORARY was the first transformation, OR, there were no
281      transformations at all.
282
283      2. Input is not coming from an input program.
284
285      3. Compaction is not necessary.
286
287      So, in this case, we shouldn't have to replace the active
288      file--it's just a waste of time and space. */
289
290   vfm_sink_info.ncases = 0;
291   vfm_sink_info.nval = dict_get_value_cnt (default_dict);
292   vfm_sink_info.case_size = (sizeof (struct ccase)
293                              + ((dict_get_value_cnt (default_dict) - 1)
294                                 * sizeof (union value)));
295   
296   if (vfm_sink == NULL)
297     {
298       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
299           && !paging)
300         {
301           msg (MW, _("Workspace overflow predicted.  Max workspace is "
302                      "currently set to %d KB (%d cases at %d bytes each).  "
303                      "Paging active file to disk."),
304                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
305                vfm_sink_info.case_size);
306           
307           paging = 1;
308         }
309       
310       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
311     }
312 }
313
314 /* Arrange for compacting the output cases for storage. */
315 static void
316 arrange_compaction (void)
317 {
318   int count_values = 0;
319
320   {
321     int i;
322     
323     /* Count up the number of `value's that will be output. */
324     for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
325       {
326         struct variable *v = dict_get_var (temp_dict, i);
327
328         if (v->name[0] != '#')
329           {
330             assert (v->nv > 0);
331             count_values += v->nv;
332           } 
333       }
334     assert (temporary == 2 || count_values <= dict_get_value_cnt (temp_dict));
335   }
336   
337   /* Compaction is only necessary if the number of `value's to output
338      differs from the number already present. */
339   compaction_nval = count_values;
340   compaction_necessary = (temporary == 2
341                           || count_values != dict_get_value_cnt (temp_dict));
342   
343   if (vfm_sink->init)
344     vfm_sink->init ();
345 }
346
347 /* Prepares the temporary case and compaction case. */
348 static void
349 make_temp_case (void)
350 {
351   temp_case = xmalloc (vfm_sink_info.case_size);
352
353   if (compaction_necessary)
354     compaction_case = xmalloc (sizeof (struct ccase)
355                                + sizeof (union value) * (compaction_nval - 1));
356 }
357
358 #if DEBUGGING
359 /* Returns the name of the variable that owns the index CCASE_INDEX
360    into ccase. */
361 static const char *
362 index_to_varname (int ccase_index)
363 {
364   int i;
365
366   for (i = 0; i < default_dict.nvar; i++)
367     {
368       struct variable *v = default_dict.var[i];
369       
370       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
371         return default_dict.var[i]->name;
372     }
373   return _("<NOVAR>");
374 }
375 #endif
376
377 /* Initializes temp_case from the vectors that say which `value's
378    need to be initialized just once, and which ones need to be
379    re-initialized before every case. */
380 static void
381 vector_initialization (void)
382 {
383   size_t var_cnt = dict_get_var_cnt (default_dict);
384   size_t i;
385   
386   for (i = 0; i < var_cnt; i++) 
387     {
388       struct variable *v = dict_get_var (default_dict, i);
389
390       if (v->type == NUMERIC) 
391         {
392           if (v->reinit)
393             temp_case->data[v->fv].f = 0.0;
394           else
395             temp_case->data[v->fv].f = SYSMIS;
396         }
397       else
398         memset (temp_case->data[v->fv].s, ' ', v->width);
399     }
400 }
401
402 /* Sets filter_index to an appropriate value. */
403 static void
404 setup_filter (void)
405 {
406   filter_var = dict_get_filter (default_dict);
407   
408   if (filter_var != NULL)
409     {
410       assert (filter_var->type == NUMERIC);
411       filter_index = filter_var->index;
412     } else {
413       filter_index = -1;
414     }
415 }
416
417 /* Sets all the lag-related variables based on value of n_lag. */
418 static void
419 setup_lag (void)
420 {
421   int i;
422   
423   if (n_lag == 0)
424     return;
425
426   lag_count = 0;
427   lag_head = 0;
428   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
429   for (i = 0; i < n_lag; i++)
430     lag_queue[i] = xmalloc (dict_get_value_cnt (temp_dict)
431                             * sizeof **lag_queue);
432 }
433
434 /* There is a lot of potential confusion in the vfm and related
435    routines over the number of `value's at each stage of the process.
436    Here is each nval count, with explanation, as set up by
437    open_active_file():
438
439    vfm_source_info.nval: Number of `value's in the cases returned by
440    the source stream.  This value turns out not to be very useful, but
441    we maintain it anyway.
442
443    vfm_sink_info.nval: Number of `value's in the cases after all
444    transformations have been performed.  Never less than
445    vfm_source_info.nval.
446
447    temp_dict->nval: Number of `value's in the cases after the
448    transformations leading up to TEMPORARY have been performed.  If
449    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
450    Never less than vfm_sink_info.nval.
451
452    compaction_nval: Number of `value's in the cases after the
453    transformations leading up to TEMPORARY have been performed and the
454    case has been compacted by compact_case(), if compaction is
455    necessary.  This the number of `value's in the cases saved by the
456    sink stream.  (However, note that the cases passed to the sink
457    stream have not yet been compacted.  It is the responsibility of
458    the data sink to call compact_case().)  This may be less than,
459    greater than, or equal to vfm_source_info.nval.  `compaction'
460    becomes the new value of default_dict.nval after the procedure is
461    completed.
462
463    default_dict.nval: This is often an alias for temp_dict->nval.  As
464    such it can really have no separate existence until the procedure
465    is complete.  For this reason it should *not* be referenced inside
466    the execution of a procedure. */
467 /* Makes all preparations for reading from the data source and writing
468    to the data sink. */
469 static void
470 open_active_file (void)
471 {
472   /* Sometimes we want to refer to the dictionary that applies to the
473      data actually written to the sink.  This is either temp_dict or
474      default_dict.  However, if TEMPORARY is not on, then temp_dict
475      does not apply.  So, we can set temp_dict to default_dict in this
476      case. */
477   if (!temporary)
478     {
479       temp_trns = n_trns;
480       temp_dict = default_dict;
481     }
482
483   /* No cases passed to the procedure yet. */
484   case_count = 0;
485
486   /* The rest. */
487   prepare_for_writing ();
488   arrange_compaction ();
489   make_temp_case ();
490   vector_initialization ();
491   discard_ctl_stack ();
492   setup_filter ();
493   setup_lag ();
494
495   /* Debug output. */
496   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
497                  vfm_source->name, vfm_sink->name));
498   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
499                  "temp_dict->nval=%d, compaction_nval=%d, "
500                  "default_dict.nval=%d\n",
501                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
502                  compaction_nval, default_dict.nval));
503 }
504 \f
505 /* Closes the active file. */
506 static void
507 close_active_file (struct write_case_data *data)
508 {
509   /* Close the current case group. */
510   if (case_count && data->endfunc != NULL)
511     data->endfunc (data->aux);
512
513   /* Stop lagging (catch up?). */
514   if (n_lag)
515     {
516       int i;
517       
518       for (i = 0; i < n_lag; i++)
519         free (lag_queue[i]);
520       free (lag_queue);
521       n_lag = 0;
522     }
523   
524   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
525      off TEMPORARY. */
526   if (temporary)
527     {
528       dict_destroy (default_dict);
529       default_dict = temp_dict;
530       temp_dict = NULL;
531     }
532
533   /* Finish compaction. */
534   if (compaction_necessary)
535     finish_compaction ();
536     
537   /* Old data sink --> New data source. */
538   if (vfm_source && vfm_source->destroy_source)
539     vfm_source->destroy_source ();
540   
541   vfm_source = vfm_sink;
542   vfm_source_info.ncases = vfm_sink_info.ncases;
543   vfm_source_info.nval = compaction_nval;
544   vfm_source_info.case_size = (sizeof (struct ccase)
545                                + (compaction_nval - 1) * sizeof (union value));
546   if (vfm_source->mode)
547     vfm_source->mode ();
548
549   /* Old data sink is gone now. */
550   vfm_sink = NULL;
551
552   /* Cancel TEMPORARY. */
553   cancel_temporary ();
554
555   /* Free temporary cases. */
556   free (temp_case);
557   temp_case = NULL;
558
559   free (compaction_case);
560   compaction_case = NULL;
561
562   /* Cancel PROCESS IF. */
563   expr_free (process_if_expr);
564   process_if_expr = NULL;
565
566   /* Cancel FILTER if temporary. */
567   if (filter_var != NULL && !FILTER_before_TEMPORARY)
568     dict_set_filter (default_dict, NULL);
569
570   /* Cancel transformations. */
571   cancel_transformations ();
572
573   /* Turn off case limiter. */
574   dict_set_case_limit (default_dict, 0);
575
576   /* Clear VECTOR vectors. */
577   dict_clear_vectors (default_dict);
578
579   debug_printf (("vfm: procedure complete\n\n"));
580 }
581 \f
582 /* Disk case stream. */
583
584 /* Associated files. */
585 FILE *disk_source_file;
586 FILE *disk_sink_file;
587
588 /* Initializes the disk sink. */
589 static void
590 disk_stream_init (void)
591 {
592   disk_sink_file = tmpfile ();
593   if (!disk_sink_file)
594     {
595       msg (ME, _("An error occurred attempting to create a temporary "
596                  "file for use as the active file: %s."),
597            strerror (errno));
598       err_failure ();
599     }
600 }
601
602 /* Reads all cases from the disk source and passes them one by one to
603    write_case(). */
604 static void
605 disk_stream_read (write_case_func *write_case, write_case_data wc_data)
606 {
607   int i;
608
609   for (i = 0; i < vfm_source_info.ncases; i++)
610     {
611       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
612         {
613           msg (ME, _("An error occurred while attempting to read from "
614                "a temporary file created for the active file: %s."),
615                strerror (errno));
616           err_failure ();
617           return;
618         }
619
620       if (!write_case (wc_data))
621         return;
622     }
623 }
624
625 /* Writes temp_case to the disk sink. */
626 static void
627 disk_stream_write (void)
628 {
629   union value *src_case;
630
631   if (compaction_necessary)
632     {
633       compact_case (compaction_case, temp_case);
634       src_case = (union value *) compaction_case;
635     }
636   else src_case = (union value *) temp_case;
637
638   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
639               disk_sink_file) != 1)
640     {
641       msg (ME, _("An error occurred while attempting to write to a "
642                  "temporary file used as the active file: %s."),
643            strerror (errno));
644       err_failure ();
645     }
646 }
647
648 /* Switches the stream from a sink to a source. */
649 static void
650 disk_stream_mode (void)
651 {
652   /* Rewind the sink. */
653   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
654     {
655       msg (ME, _("An error occurred while attempting to rewind a "
656                  "temporary file used as the active file: %s."),
657            strerror (errno));
658       err_failure ();
659     }
660   
661   /* Sink --> source variables. */
662   disk_source_file = disk_sink_file;
663 }
664
665 /* Destroys the source's internal data. */
666 static void
667 disk_stream_destroy_source (void)
668 {
669   if (disk_source_file)
670     {
671       fclose (disk_source_file);
672       disk_source_file = NULL;
673     }
674 }
675
676 /* Destroys the sink's internal data. */
677 static void
678 disk_stream_destroy_sink (void)
679 {
680   if (disk_sink_file)
681     {
682       fclose (disk_sink_file);
683       disk_sink_file = NULL;
684     }
685 }
686
687 /* Disk stream. */
688 struct case_stream vfm_disk_stream = 
689   {
690     disk_stream_init,
691     disk_stream_read,
692     disk_stream_write,
693     disk_stream_mode,
694     disk_stream_destroy_source,
695     disk_stream_destroy_sink,
696     "disk",
697   };
698 \f
699 /* Memory case stream. */
700
701 /* List of cases stored in the stream. */
702 struct case_list *memory_source_cases;
703 struct case_list *memory_sink_cases;
704
705 /* Current case. */
706 struct case_list *memory_sink_iter;
707
708 /* Maximum number of cases. */
709 int memory_sink_max_cases;
710
711 /* Initializes the memory stream variables for writing. */
712 static void
713 memory_stream_init (void)
714 {
715   memory_sink_cases = NULL;
716   memory_sink_iter = NULL;
717   
718   assert (compaction_nval);
719   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
720 }
721
722 /* Reads the case stream from memory and passes it to write_case(). */
723 static void
724 memory_stream_read (write_case_func *write_case, write_case_data wc_data)
725 {
726   while (memory_source_cases != NULL)
727     {
728       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
729       
730       {
731         struct case_list *current = memory_source_cases;
732         memory_source_cases = memory_source_cases->next;
733         free (current);
734       }
735       
736       if (!write_case (wc_data))
737         return;
738     }
739 }
740
741 /* Writes temp_case to the memory stream. */
742 static void
743 memory_stream_write (void)
744 {
745   struct case_list *new_case = malloc (sizeof (struct case_list)
746                                        + ((compaction_nval - 1)
747                                           * sizeof (union value)));
748
749   /* If we've got memory to spare then add it to the linked list. */
750   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
751     {
752       if (compaction_necessary)
753         compact_case (&new_case->c, temp_case);
754       else
755         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
756
757       /* Append case to linked list. */
758       if (memory_sink_cases)
759         memory_sink_iter = memory_sink_iter->next = new_case;
760       else
761         memory_sink_iter = memory_sink_cases = new_case;
762     }
763   else
764     {
765       /* Out of memory.  Write the active file to disk. */
766       struct case_list *cur, *next;
767
768       /* Notify the user. */
769       if (!new_case)
770         msg (MW, _("Virtual memory exhausted.  Paging active file "
771                    "to disk."));
772       else
773         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
774                    "overflowed.  Paging active file to disk."),
775              MAX_WORKSPACE / 1024, memory_sink_max_cases,
776              compaction_nval * sizeof (union value));
777
778       free (new_case);
779
780       /* Switch to a disk sink. */
781       vfm_sink = &vfm_disk_stream;
782       vfm_sink->init ();
783       paging = 1;
784
785       /* Terminate the list. */
786       if (memory_sink_iter)
787         memory_sink_iter->next = NULL;
788
789       /* Write the cases to disk and destroy them.  We can't call
790          vfm->sink->write() because of compaction. */
791       for (cur = memory_sink_cases; cur; cur = next)
792         {
793           next = cur->next;
794           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
795                       disk_sink_file) != 1)
796             {
797               msg (ME, _("An error occurred while attempting to "
798                          "write to a temporary file created as the "
799                          "active file, while paging to disk: %s."),
800                    strerror (errno));
801               err_failure ();
802             }
803           free (cur);
804         }
805
806       /* Write the current case to disk. */
807       vfm_sink->write ();
808     }
809 }
810
811 /* If the data is stored in memory, causes it to be written to disk.
812    To be called only *between* procedure()s, not within them. */
813 void
814 page_to_disk (void)
815 {
816   if (vfm_source == &vfm_memory_stream)
817     {
818       /* Switch to a disk sink. */
819       vfm_sink = &vfm_disk_stream;
820       vfm_sink->init ();
821       paging = 1;
822       
823       /* Write the cases to disk and destroy them.  We can't call
824          vfm->sink->write() because of compaction. */
825       {
826         struct case_list *cur, *next;
827         
828         for (cur = memory_source_cases; cur; cur = next)
829           {
830             next = cur->next;
831             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
832                         disk_sink_file) != 1)
833               {
834                 msg (ME, _("An error occurred while attempting to "
835                            "write to a temporary file created as the "
836                            "active file, while paging to disk: %s."),
837                      strerror (errno));
838                 err_failure ();
839               }
840             free (cur);
841           }
842       }
843       
844       vfm_source = &vfm_disk_stream;
845       vfm_source->mode ();
846
847       vfm_sink = NULL;
848     }
849 }
850
851 /* Switch the memory stream from sink to source mode. */
852 static void
853 memory_stream_mode (void)
854 {
855   /* Terminate the list. */
856   if (memory_sink_iter)
857     memory_sink_iter->next = NULL;
858
859   /* Sink --> source variables. */
860   memory_source_cases = memory_sink_cases;
861   memory_sink_cases = NULL;
862 }
863
864 /* Destroy all memory source data. */
865 static void
866 memory_stream_destroy_source (void)
867 {
868   struct case_list *cur, *next;
869   
870   for (cur = memory_source_cases; cur; cur = next)
871     {
872       next = cur->next;
873       free (cur);
874     }
875   memory_source_cases = NULL;
876 }
877
878 /* Destroy all memory sink data. */
879 static void
880 memory_stream_destroy_sink (void)
881 {
882   struct case_list *cur, *next;
883   
884   for (cur = memory_sink_cases; cur; cur = next)
885     {
886       next = cur->next;
887       free (cur);
888     }
889   memory_sink_cases = NULL;
890 }
891   
892 /* Memory stream. */
893 struct case_stream vfm_memory_stream = 
894   {
895     memory_stream_init,
896     memory_stream_read,
897     memory_stream_write,
898     memory_stream_mode,
899     memory_stream_destroy_source,
900     memory_stream_destroy_sink,
901     "memory",
902   };
903 \f
904 #include "debug-print.h"
905
906 /* Add temp_case to the lag queue. */
907 static void
908 lag_case (void)
909 {
910   if (lag_count < n_lag)
911     lag_count++;
912   memcpy (lag_queue[lag_head], temp_case,
913           sizeof (union value) * dict_get_value_cnt (temp_dict));
914   if (++lag_head >= n_lag)
915     lag_head = 0;
916 }
917
918 /* Returns a pointer to the lagged case from N_BEFORE cases before the
919    current one, or NULL if there haven't been that many cases yet. */
920 struct ccase *
921 lagged_case (int n_before)
922 {
923   assert (n_before <= n_lag);
924   if (n_before > lag_count)
925     return NULL;
926   
927   {
928     int index = lag_head - n_before;
929     if (index < 0)
930       index += n_lag;
931     return lag_queue[index];
932   }
933 }
934    
935 /* Transforms temp_case and writes it to the replacement active file
936    if advisable.  Returns nonzero if more cases can be accepted, zero
937    otherwise.  Do not call this function again after it has returned
938    zero once.  */
939 int
940 procedure_write_case (write_case_data wc_data)
941 {
942   /* Index of current transformation. */
943   int cur_trns;
944
945   /* Return value: whether it's reasonable to write any more cases. */
946   int more_cases = 1;
947
948   debug_printf ((_("transform: ")));
949
950   cur_trns = f_trns;
951   for (;;)
952     {
953       /* Output the case if this is temp_trns. */
954       if (cur_trns == temp_trns)
955         {
956           debug_printf (("REC"));
957
958           if (n_lag)
959             lag_case ();
960           
961           vfm_sink_info.ncases++;
962           vfm_sink->write ();
963
964           if (dict_get_case_limit (default_dict))
965             more_cases = (vfm_sink_info.ncases
966                           < dict_get_case_limit (default_dict));
967         }
968
969       /* Are we done? */
970       if (cur_trns >= n_trns)
971         break;
972       
973       debug_printf (("$%d", cur_trns));
974
975       /* Decide which transformation should come next. */
976       {
977         int code;
978         
979         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
980         switch (code)
981           {
982           case -1:
983             /* Next transformation. */
984             cur_trns++;
985             break;
986           case -2:
987             /* Delete this case. */
988             goto done;
989           default:
990             /* Go to that transformation. */
991             cur_trns = code;
992             break;
993           }
994       }
995     }
996
997   /* Call the beginning of group function. */
998   if (!case_count && wc_data->beginfunc != NULL)
999     wc_data->beginfunc (wc_data->aux);
1000
1001   /* Call the procedure if there is one and FILTER and PROCESS IF
1002      don't prohibit it. */
1003   if (wc_data->procfunc != NULL
1004       && !FILTERED
1005       && (process_if_expr == NULL ||
1006           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
1007     wc_data->procfunc (temp_case, wc_data->aux);
1008
1009   case_count++;
1010   
1011 done:
1012   debug_putc ('\n', stdout);
1013
1014   clear_temp_case ();
1015   
1016   /* Return previously determined value. */
1017   return more_cases;
1018 }
1019
1020 /* Clears the variables in the temporary case that need to be
1021    cleared between processing cases.  */
1022 static void
1023 clear_temp_case (void)
1024 {
1025   /* FIXME?  This is linear in the number of variables, but
1026      doesn't need to be, so it's an easy optimization target. */
1027   size_t var_cnt = dict_get_var_cnt (default_dict);
1028   size_t i;
1029   
1030   for (i = 0; i < var_cnt; i++) 
1031     {
1032       struct variable *v = dict_get_var (default_dict, i);
1033       if (v->init && v->reinit) 
1034         {
1035           if (v->type == NUMERIC) 
1036             temp_case->data[v->fv].f = SYSMIS;
1037           else
1038             memset (temp_case->data[v->fv].s, ' ', v->width);
1039         } 
1040     }
1041 }
1042
1043 /* Appends TRNS to t_trns[], the list of all transformations to be
1044    performed on data as it is read from the active file. */
1045 void
1046 add_transformation (struct trns_header * trns)
1047 {
1048   if (n_trns >= m_trns)
1049     {
1050       m_trns += 16;
1051       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1052     }
1053   t_trns[n_trns] = trns;
1054   trns->index = n_trns++;
1055 }
1056
1057 /* Cancels all active transformations, including any transformations
1058    created by the input program. */
1059 void
1060 cancel_transformations (void)
1061 {
1062   int i;
1063   for (i = 0; i < n_trns; i++)
1064     {
1065       if (t_trns[i]->free)
1066         t_trns[i]->free (t_trns[i]);
1067       free (t_trns[i]);
1068     }
1069   n_trns = f_trns = 0;
1070   if (m_trns > 32)
1071     {
1072       free (t_trns);
1073       m_trns = 0;
1074     }
1075 }
1076
1077 /* Dumps out the values of all the split variables for the case C. */
1078 static void
1079 dump_splits (struct ccase *c)
1080 {
1081   struct variable *const *split;
1082   struct tab_table *t;
1083   size_t split_cnt;
1084   int i;
1085
1086   split_cnt = dict_get_split_cnt (default_dict);
1087   t = tab_create (3, split_cnt + 1, 0);
1088   tab_dim (t, tab_natural_dimensions);
1089   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1090   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1091   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1092   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1093   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1094   split = dict_get_split_vars (default_dict);
1095   for (i = 0; i < split_cnt; i++)
1096     {
1097       struct variable *v = split[i];
1098       char temp_buf[80];
1099       const char *val_lab;
1100
1101       assert (v->type == NUMERIC || v->type == ALPHA);
1102       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1103       
1104       {
1105         union value val = c->data[v->fv];
1106         if (v->type == ALPHA)
1107           val.c = c->data[v->fv].s;
1108         data_out (temp_buf, &v->print, &val);
1109       }
1110       
1111       temp_buf[v->print.w] = 0;
1112       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1113
1114       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1115       if (val_lab)
1116         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1117     }
1118   tab_flags (t, SOMF_NO_TITLE);
1119   tab_submit (t);
1120 }
1121
1122 /* This procfunc is substituted for the user-supplied procfunc when
1123    SPLIT FILE is active.  This function forms a wrapper around that
1124    procfunc by dividing the input into series. */
1125 static int
1126 SPLIT_FILE_procfunc (struct ccase *c, void *data_)
1127 {
1128   struct write_case_data *data = data_;
1129   static struct ccase *prev_case;
1130   struct variable *const *split;
1131   size_t split_cnt;
1132   size_t i;
1133
1134   /* The first case always begins a new series.  We also need to
1135      preserve the values of the case for later comparison. */
1136   if (case_count == 0)
1137     {
1138       if (prev_case)
1139         free (prev_case);
1140       prev_case = xmalloc (vfm_sink_info.case_size);
1141       memcpy (prev_case, c, vfm_sink_info.case_size);
1142
1143       dump_splits (c);
1144       if (data->beginfunc != NULL)
1145         data->beginfunc (data->aux);
1146       
1147       return data->procfunc (c, data->aux);
1148     }
1149
1150   /* Compare the value of each SPLIT FILE variable to the values on
1151      the previous case. */
1152   split = dict_get_split_vars (default_dict);
1153   split_cnt = dict_get_split_cnt (default_dict);
1154   for (i = 0; i < split_cnt; i++)
1155     {
1156       struct variable *v = split[i];
1157       
1158       switch (v->type)
1159         {
1160         case NUMERIC:
1161           if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
1162             goto not_equal;
1163           break;
1164         case ALPHA:
1165           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1166             goto not_equal;
1167           break;
1168         default:
1169           assert (0);
1170         }
1171     }
1172   return data->procfunc (c, data->aux);
1173   
1174 not_equal:
1175   /* The values of the SPLIT FILE variable are different from the
1176      values on the previous case.  That means that it's time to begin
1177      a new series. */
1178   if (data->endfunc != NULL)
1179     data->endfunc (data->aux);
1180   dump_splits (c);
1181   if (data->beginfunc != NULL)
1182     data->beginfunc (data->aux);
1183   memcpy (prev_case, c, vfm_sink_info.case_size);
1184   return data->procfunc (c, data->aux);
1185 }
1186 \f
1187 /* Case compaction. */
1188
1189 /* Copies case SRC to case DEST, compacting it in the process. */
1190 void
1191 compact_case (struct ccase *dest, const struct ccase *src)
1192 {
1193   int i;
1194   int nval = 0;
1195   size_t var_cnt;
1196   
1197   assert (compaction_necessary);
1198
1199   if (temporary == 2)
1200     {
1201       if (dest != compaction_case)
1202         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1203       return;
1204     }
1205
1206   /* Copy all the variables except the scratch variables from SRC to
1207      DEST. */
1208   var_cnt = dict_get_var_cnt (default_dict);
1209   for (i = 0; i < var_cnt; i++)
1210     {
1211       struct variable *v = dict_get_var (default_dict, i);
1212       
1213       if (v->name[0] == '#')
1214         continue;
1215
1216       if (v->type == NUMERIC)
1217         dest->data[nval++] = src->data[v->fv];
1218       else
1219         {
1220           int w = DIV_RND_UP (v->width, sizeof (union value));
1221           
1222           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1223           nval += w;
1224         }
1225     }
1226 }
1227
1228 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1229 static void
1230 finish_compaction (void)
1231 {
1232   int i;
1233
1234   for (i = 0; i < dict_get_var_cnt (default_dict); )
1235     {
1236       struct variable *v = dict_get_var (default_dict, i);
1237
1238       if (v->name[0] == '#') 
1239         dict_delete_var (default_dict, v);
1240       else
1241         i++;
1242     }
1243   dict_compact_values (default_dict);
1244 }
1245
1246