331f287f8c4d1de47283e8da46d909ee62d83c4f
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 /* AIX requires this to be the first thing in the file.  */
21 #include <config.h>
22 #if __GNUC__
23 #define alloca __builtin_alloca
24 #else
25 #if HAVE_ALLOCA_H
26 #include <alloca.h>
27 #else
28 #ifdef _AIX
29 #pragma alloca
30 #else
31 #ifndef alloca                  /* predefined by HP cc +Olibcalls */
32 char *alloca ();
33 #endif
34 #endif
35 #endif
36 #endif
37
38 #include <assert.h>
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #if HAVE_UNISTD_H
43 #include <unistd.h>     /* Required by SunOS4. */
44 #endif
45 #include "alloc.h"
46 #include "approx.h"
47 #include "do-ifP.h"
48 #include "error.h"
49 #include "expr.h"
50 #include "misc.h"
51 #include "random.h"
52 #include "som.h"
53 #include "str.h"
54 #include "tab.h"
55 #include "var.h"
56 #include "vector.h"
57 #include "vfm.h"
58 #include "vfmP.h"
59
60 /*
61    Virtual File Manager (vfm):
62
63    vfm is used to process data files.  It uses the model that data is
64    read from one stream (the data source), then written to another
65    (the data sink).  The data source is then deleted and the data sink
66    becomes the data source for the next procedure. */
67
68 #undef DEBUGGING
69 /*#define DEBUGGING 1 */
70 #include "debug-print.h"
71
72 /* This is used to read from the active file. */
73 struct case_stream *vfm_source;
74
75 /* `value' indexes to initialize to particular values for certain cases. */
76 struct long_vec reinit_sysmis;          /* SYSMIS for every case. */
77 struct long_vec reinit_blanks;          /* Blanks for every case. */
78 struct long_vec init_zero;              /* Zero for first case only. */
79 struct long_vec init_blanks;            /* Blanks for first case only. */
80
81 /* This is used to write to the replacement active file. */
82 struct case_stream *vfm_sink;
83
84 /* Information about the data source. */
85 struct stream_info vfm_source_info;
86
87 /* Information about the data sink. */
88 struct stream_info vfm_sink_info;
89
90 /* Filter variable and  `value' index. */
91 static struct variable *filter_var;
92 static int filter_index;
93
94 #define FILTERED                                                        \
95         (filter_index != -1                                             \
96          && (temp_case->data[filter_index].f == 0.0                     \
97              || temp_case->data[filter_index].f == SYSMIS               \
98              || is_num_user_missing (temp_case->data[filter_index].f,   \
99                                      filter_var)))
100
101 /* Nonzero if the case needs to have values deleted before being
102    stored, zero otherwise. */
103 int compaction_necessary;
104
105 /* Number of values after compaction, or the same as
106    vfm_sink_info.nval, if compaction is not necessary. */
107 int compaction_nval;
108
109 /* Temporary case buffer with enough room for `compaction_nval'
110    `value's. */
111 struct ccase *compaction_case;
112
113 /* Within a session, when paging is turned on, it is never turned back
114    off.  This policy might be too aggressive. */
115 static int paging = 0;
116
117 /* Time at which vfm was last invoked. */
118 time_t last_vfm_invocation;
119
120 /* Functions called during procedure processing. */
121 static int (*proc_func) (struct ccase *);       /* Called for each case. */
122 static int (*virt_proc_func) (struct ccase *);  /* From SPLIT_FILE_procfunc. */
123 static void (*begin_func) (void);       /* Called at beginning of a series. */
124 static void (*virt_begin_func) (void);  /* Called by SPLIT_FILE_procfunc. */
125 static void (*end_func) (void); /* Called after end of a series. */
126 int (*write_case) (void);
127
128 /* Number of cases passed to proc_func(). */
129 static int case_count;
130
131 /* Lag queue. */
132 int n_lag;                      /* Number of cases to lag. */
133 static int lag_count;           /* Number of cases in lag_queue so far. */
134 static int lag_head;            /* Index where next case will be added. */
135 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
136
137 static void open_active_file (void);
138 static void close_active_file (void);
139 static int SPLIT_FILE_procfunc (struct ccase *);
140 static void finish_compaction (void);
141 static void lag_case (void);
142 static int procedure_write_case (void);
143 \f
144 /* Public functions. */
145
146 /* Reads all the cases from the active file, transforms them by the
147    active set of transformations, calls PROCFUNC with CURCASE set to
148    the case and CASENUM set to the case number, and writes them to a
149    new active file.
150
151    Divides the active file into zero or more series of one or more
152    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
153    called after each series. */
154 void
155 procedure (void (*beginfunc) (void),
156            int (*procfunc) (struct ccase *curcase),
157            void (*endfunc) (void))
158 {
159   end_func = endfunc;
160   write_case = procedure_write_case;
161
162   if (default_dict.n_splits && procfunc != NULL)
163     {
164       virt_proc_func = procfunc;
165       proc_func = SPLIT_FILE_procfunc;
166       
167       virt_begin_func = beginfunc;
168       begin_func = NULL;
169     } else {
170       begin_func = beginfunc;
171       proc_func = procfunc;
172     }
173
174   last_vfm_invocation = time (NULL);
175
176   open_active_file ();
177   vfm_source->read ();
178   close_active_file ();
179 }
180 \f
181 /* Active file processing support.  Subtly different semantics from
182    procedure(). */
183
184 static int process_active_file_write_case (void);
185
186 /* The casefunc might want us to stop calling it. */
187 static int not_canceled;
188
189 /* Reads all the cases from the active file and passes them one-by-one
190    to CASEFUNC in temp_case.  Before any cases are passed, calls
191    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
192    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
193    file by calling process_active_file_output_case().
194
195    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
196 void
197 process_active_file (void (*beginfunc) (void),
198                      int (*casefunc) (struct ccase *curcase),
199                      void (*endfunc) (void))
200 {
201   proc_func = casefunc;
202   write_case = process_active_file_write_case;
203   not_canceled = 1;
204
205   open_active_file ();
206   beginfunc ();
207   
208   /* There doesn't necessarily need to be an active file. */
209   if (vfm_source)
210     vfm_source->read ();
211   
212   endfunc ();
213   close_active_file ();
214 }
215
216 /* Pass the current case to casefunc. */
217 static int
218 process_active_file_write_case (void)
219 {
220   /* Index of current transformation. */
221   int cur_trns;
222
223   for (cur_trns = f_trns ; cur_trns != temp_trns; )
224     {
225       int code;
226         
227       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
228       switch (code)
229         {
230         case -1:
231           /* Next transformation. */
232           cur_trns++;
233           break;
234         case -2:
235           /* Delete this case. */
236           goto done;
237         default:
238           /* Go to that transformation. */
239           cur_trns = code;
240           break;
241         }
242     }
243
244   if (n_lag)
245     lag_case ();
246           
247   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
248   if (not_canceled
249       && !FILTERED
250       && (process_if_expr == NULL ||
251           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
252     not_canceled = proc_func (temp_case);
253   
254   case_count++;
255   
256  done:
257   {
258     long *lp;
259
260     /* This case is finished.  Initialize the variables for the next case. */
261     for (lp = reinit_sysmis.vec; *lp != -1;)
262       temp_case->data[*lp++].f = SYSMIS;
263     for (lp = reinit_blanks.vec; *lp != -1;)
264       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
265   }
266   
267   return 1;
268 }
269
270 /* Write temp_case to the active file. */
271 void
272 process_active_file_output_case (void)
273 {
274   vfm_sink_info.ncases++;
275   vfm_sink->write ();
276 }
277 \f
278 /* Opening the active file. */
279
280 /* It might be usefully noted that the following several functions are
281    given in the order that they are called by open_active_file(). */
282
283 /* Prepare to write to the replacement active file. */
284 static void
285 prepare_for_writing (void)
286 {
287   /* FIXME: If ALL the conditions listed below hold true, then the
288      replacement active file is guaranteed to be identical to the
289      original active file:
290
291      1. TEMPORARY was the first transformation, OR, there were no
292      transformations at all.
293
294      2. Input is not coming from an input program.
295
296      3. Compaction is not necessary.
297
298      So, in this case, we shouldn't have to replace the active
299      file--it's just a waste of time and space. */
300
301   vfm_sink_info.ncases = 0;
302   vfm_sink_info.nval = default_dict.nval;
303   vfm_sink_info.case_size = (sizeof (struct ccase)
304                              + (default_dict.nval - 1) * sizeof (union value));
305   
306   if (vfm_sink == NULL)
307     {
308       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
309           && !paging)
310         {
311           msg (MW, _("Workspace overflow predicted.  Max workspace is "
312                      "currently set to %d KB (%d cases at %d bytes each).  "
313                      "Paging active file to disk."),
314                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
315                vfm_sink_info.case_size);
316           
317           paging = 1;
318         }
319       
320       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
321     }
322 }
323
324 /* Arrange for compacting the output cases for storage. */
325 static void
326 arrange_compaction (void)
327 {
328   int count_values = 0;
329
330   {
331     int i;
332     
333     /* Count up the number of `value's that will be output. */
334     for (i = 0; i < temp_dict->nvar; i++)
335       if (temp_dict->var[i]->name[0] != '#')
336         {
337           assert (temp_dict->var[i]->nv > 0);
338           count_values += temp_dict->var[i]->nv;
339         }
340     assert (temporary == 2 || count_values <= temp_dict->nval);
341   }
342   
343   /* Compaction is only necessary if the number of `value's to output
344      differs from the number already present. */
345   compaction_nval = count_values;
346   compaction_necessary = temporary == 2 || count_values != temp_dict->nval;
347   
348   if (vfm_sink->init)
349     vfm_sink->init ();
350 }
351
352 /* Prepares the temporary case and compaction case. */
353 static void
354 make_temp_case (void)
355 {
356   temp_case = xmalloc (vfm_sink_info.case_size);
357
358   if (compaction_necessary)
359     compaction_case = xmalloc (sizeof (struct ccase)
360                                + sizeof (union value) * (compaction_nval - 1));
361 }
362
363 #if DEBUGGING
364 /* Returns the name of the variable that owns the index CCASE_INDEX
365    into ccase. */
366 static const char *
367 index_to_varname (int ccase_index)
368 {
369   int i;
370
371   for (i = 0; i < default_dict.nvar; i++)
372     {
373       variable *v = default_dict.var[i];
374       
375       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
376         return default_dict.var[i]->name;
377     }
378   return _("<NOVAR>");
379 }
380 #endif
381
382 /* Initializes temp_case from the vectors that say which `value's need
383    to be initialized just once, and which ones need to be
384    re-initialized before every case. */
385 static void
386 vector_initialization (void)
387 {
388   int i;
389   long *lp;
390   
391   /* Just once. */
392   for (i = 0; i < init_zero.n; i++)
393     temp_case->data[init_zero.vec[i]].f = 0.0;
394   for (i = 0; i < init_blanks.n; i++)
395     memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING);
396
397   /* These vectors need to be repeatedly accessed, so we add a
398      sentinel to (hopefully) improve speed. */
399   vec_insert (&reinit_sysmis, -1);
400   vec_insert (&reinit_blanks, -1);
401
402   for (lp = reinit_sysmis.vec; *lp != -1;)
403     temp_case->data[*lp++].f = SYSMIS;
404   for (lp = reinit_blanks.vec; *lp != -1;)
405     memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
406   
407 #if DEBUGGING
408   printf ("vfm: init_zero=");
409   for (i = 0; i < init_zero.n; i++)
410     printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i]));
411   printf (" init_blanks=");
412   for (i = 0; i < init_blanks.n; i++)
413     printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i]));
414   printf (" reinit_sysmis=");
415   for (lp = reinit_sysmis.vec; *lp != -1; lp++)
416     printf ("%s%s", lp != reinit_sysmis.vec ? "," : "",
417             index_to_varname (*lp));
418   printf (" reinit_blanks=");
419   for (lp = reinit_blanks.vec; *lp != -1; lp++)
420     printf ("%s%s", lp != reinit_blanks.vec ? "," : "",
421             index_to_varname (*lp));
422   printf ("\n");
423 #endif
424 }
425
426 /* Sets filter_index to an appropriate value. */
427 static void
428 setup_filter (void)
429 {
430   filter_index = -1;
431   
432   if (default_dict.filter_var[0])
433     {
434       struct variable *fv = find_variable (default_dict.filter_var);
435       
436       if (fv == NULL || fv->type == ALPHA)
437         default_dict.filter_var[0] = 0;
438       else
439         {
440           filter_index = fv->index;
441           filter_var = fv;
442         }
443     }
444 }
445
446 /* Sets all the lag-related variables based on value of n_lag. */
447 static void
448 setup_lag (void)
449 {
450   int i;
451   
452   if (n_lag == 0)
453     return;
454
455   lag_count = 0;
456   lag_head = 0;
457   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
458   for (i = 0; i < n_lag; i++)
459     lag_queue[i] = xmalloc (temp_dict->nval * sizeof **lag_queue);
460 }
461
462 /* There is a lot of potential confusion in the vfm and related
463    routines over the number of `value's at each stage of the process.
464    Here is each nval count, with explanation, as set up by
465    open_active_file():
466
467    vfm_source_info.nval: Number of `value's in the cases returned by
468    the source stream.  This value turns out not to be very useful, but
469    we maintain it anyway.
470
471    vfm_sink_info.nval: Number of `value's in the cases after all
472    transformations have been performed.  Never less than
473    vfm_source_info.nval.
474
475    temp_dict->nval: Number of `value's in the cases after the
476    transformations leading up to TEMPORARY have been performed.  If
477    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
478    Never less than vfm_sink_info.nval.
479
480    compaction_nval: Number of `value's in the cases after the
481    transformations leading up to TEMPORARY have been performed and the
482    case has been compacted by compact_case(), if compaction is
483    necessary.  This the number of `value's in the cases saved by the
484    sink stream.  (However, note that the cases passed to the sink
485    stream have not yet been compacted.  It is the responsibility of
486    the data sink to call compact_case().)  This may be less than,
487    greater than, or equal to vfm_source_info.nval.  `compaction'
488    becomes the new value of default_dict.nval after the procedure is
489    completed.
490
491    default_dict.nval: This is often an alias for temp_dict->nval.  As
492    such it can really have no separate existence until the procedure
493    is complete.  For this reason it should *not* be referenced inside
494    the execution of a procedure. */
495 /* Makes all preparations for reading from the data source and writing
496    to the data sink. */
497 static void
498 open_active_file (void)
499 {
500   /* Sometimes we want to refer to the dictionary that applies to the
501      data actually written to the sink.  This is either temp_dict or
502      default_dict.  However, if TEMPORARY is not on, then temp_dict
503      does not apply.  So, we can set temp_dict to default_dict in this
504      case. */
505   if (!temporary)
506     {
507       temp_trns = n_trns;
508       temp_dict = &default_dict;
509     }
510
511   /* No cases passed to the procedure yet. */
512   case_count = 0;
513
514   /* The rest. */
515   prepare_for_writing ();
516   arrange_compaction ();
517   make_temp_case ();
518   vector_initialization ();
519   setup_randomize ();
520   discard_ctl_stack ();
521   setup_filter ();
522   setup_lag ();
523
524   /* Debug output. */
525   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
526                  vfm_source->name, vfm_sink->name));
527   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
528                  "temp_dict->nval=%d, compaction_nval=%d, "
529                  "default_dict.nval=%d\n",
530                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
531                  compaction_nval, default_dict.nval));
532 }
533 \f
534 /* Closes the active file. */
535 static void
536 close_active_file (void)
537 {
538   /* Close the current case group. */
539   if (case_count && end_func != NULL)
540     end_func ();
541
542   /* Stop lagging (catch up?). */
543   if (n_lag)
544     {
545       int i;
546       
547       for (i = 0; i < n_lag; i++)
548         free (lag_queue[i]);
549       free (lag_queue);
550       n_lag = 0;
551     }
552   
553   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
554      off TEMPORARY. */
555   if (temporary)
556     {
557       restore_dictionary (temp_dict);
558       temp_dict = NULL;
559     }
560
561   /* The default dictionary assumes the compacted data size. */
562   default_dict.nval = compaction_nval;
563     
564   /* Old data sink --> New data source. */
565   if (vfm_source && vfm_source->destroy_source)
566     vfm_source->destroy_source ();
567   
568   vfm_source = vfm_sink;
569   vfm_source_info.ncases = vfm_sink_info.ncases;
570   vfm_source_info.nval = compaction_nval;
571   vfm_source_info.case_size = (sizeof (struct ccase)
572                                + (compaction_nval - 1) * sizeof (union value));
573   if (vfm_source->mode)
574     vfm_source->mode ();
575
576   /* Old data sink is gone now. */
577   vfm_sink = NULL;
578
579   /* Finish compaction. */
580   if (compaction_necessary)
581     finish_compaction ();
582   cancel_temporary ();
583
584   /* Free temporary cases. */
585   free (temp_case);
586   temp_case = NULL;
587
588   free (compaction_case);
589   compaction_case = NULL;
590
591   /* Cancel PROCESS IF. */
592   expr_free (process_if_expr);
593   process_if_expr = NULL;
594
595   /* Cancel FILTER if temporary. */
596   if (filter_index != -1 && !FILTER_before_TEMPORARY)
597     default_dict.filter_var[0] = 0;
598
599   /* Cancel transformations. */
600   cancel_transformations ();
601
602   /* Clear value-initialization vectors. */
603   vec_clear (&init_zero);
604   vec_clear (&init_blanks);
605   vec_clear (&reinit_sysmis);
606   vec_clear (&reinit_blanks);
607
608   /* Turn off case limiter. */
609   default_dict.N = 0;
610
611   /* Clear VECTOR vectors. */
612   {
613     int i;
614
615     for (i = 0; i < nvec; i++)
616       free (vec[i].v);
617     free (vec);
618     vec = NULL;
619     nvec = 0;
620   }
621
622   debug_printf (("vfm: procedure complete\n\n"));
623 }
624 \f
625 /* Disk case stream. */
626
627 /* Associated files. */
628 FILE *disk_source_file;
629 FILE *disk_sink_file;
630
631 /* Initializes the disk sink. */
632 static void
633 disk_stream_init (void)
634 {
635   disk_sink_file = tmpfile ();
636   if (!disk_sink_file)
637     {
638       msg (ME, _("An error occurred attempting to create a temporary "
639                  "file for use as the active file: %s."),
640            strerror (errno));
641       err_failure ();
642     }
643 }
644
645 /* Reads all cases from the disk source and passes them one by one to
646    write_case(). */
647 static void
648 disk_stream_read (void)
649 {
650   int i;
651
652   for (i = 0; i < vfm_source_info.ncases; i++)
653     {
654       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
655         {
656           msg (ME, _("An error occurred while attempting to read from "
657                "a temporary file created for the active file: %s."),
658                strerror (errno));
659           err_failure ();
660           return;
661         }
662
663       if (!write_case ())
664         return;
665     }
666 }
667
668 /* Writes temp_case to the disk sink. */
669 static void
670 disk_stream_write (void)
671 {
672   union value *src_case;
673
674   if (compaction_necessary)
675     {
676       compact_case (compaction_case, temp_case);
677       src_case = (union value *) compaction_case;
678     }
679   else src_case = (union value *) temp_case;
680
681   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
682               disk_sink_file) != 1)
683     {
684       msg (ME, _("An error occurred while attempting to write to a "
685                  "temporary file used as the active file: %s."),
686            strerror (errno));
687       err_failure ();
688     }
689 }
690
691 /* Switches the stream from a sink to a source. */
692 static void
693 disk_stream_mode (void)
694 {
695   /* Rewind the sink. */
696   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
697     {
698       msg (ME, _("An error occurred while attempting to rewind a "
699                  "temporary file used as the active file: %s."),
700            strerror (errno));
701       err_failure ();
702     }
703   
704   /* Sink --> source variables. */
705   disk_source_file = disk_sink_file;
706 }
707
708 /* Destroys the source's internal data. */
709 static void
710 disk_stream_destroy_source (void)
711 {
712   if (disk_source_file)
713     {
714       fclose (disk_source_file);
715       disk_source_file = NULL;
716     }
717 }
718
719 /* Destroys the sink's internal data. */
720 static void
721 disk_stream_destroy_sink (void)
722 {
723   if (disk_sink_file)
724     {
725       fclose (disk_sink_file);
726       disk_sink_file = NULL;
727     }
728 }
729
730 /* Disk stream. */
731 struct case_stream vfm_disk_stream = 
732   {
733     disk_stream_init,
734     disk_stream_read,
735     disk_stream_write,
736     disk_stream_mode,
737     disk_stream_destroy_source,
738     disk_stream_destroy_sink,
739     "disk",
740   };
741 \f
742 /* Memory case stream. */
743
744 /* List of cases stored in the stream. */
745 struct case_list *memory_source_cases;
746 struct case_list *memory_sink_cases;
747
748 /* Current case. */
749 struct case_list *memory_sink_iter;
750
751 /* Maximum number of cases. */
752 int memory_sink_max_cases;
753
754 /* Initializes the memory stream variables for writing. */
755 static void
756 memory_stream_init (void)
757 {
758   memory_sink_cases = NULL;
759   memory_sink_iter = NULL;
760   
761   assert (compaction_nval);
762   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
763 }
764
765 /* Reads the case stream from memory and passes it to write_case(). */
766 static void
767 memory_stream_read (void)
768 {
769   while (memory_source_cases != NULL)
770     {
771       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
772       
773       {
774         struct case_list *current = memory_source_cases;
775         memory_source_cases = memory_source_cases->next;
776         free (current);
777       }
778       
779       if (!write_case ())
780         return;
781     }
782 }
783
784 /* Writes temp_case to the memory stream. */
785 static void
786 memory_stream_write (void)
787 {
788   struct case_list *new_case = malloc (sizeof (struct case_list)
789                                        + ((compaction_nval - 1)
790                                           * sizeof (union value)));
791
792   /* If we've got memory to spare then add it to the linked list. */
793   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
794     {
795       if (compaction_necessary)
796         compact_case (&new_case->c, temp_case);
797       else
798         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
799
800       /* Append case to linked list. */
801       if (memory_sink_cases)
802         memory_sink_iter = memory_sink_iter->next = new_case;
803       else
804         memory_sink_iter = memory_sink_cases = new_case;
805     }
806   else
807     {
808       /* Out of memory.  Write the active file to disk. */
809       struct case_list *cur, *next;
810
811       /* Notify the user. */
812       if (!new_case)
813         msg (MW, _("Virtual memory exhausted.  Paging active file "
814                    "to disk."));
815       else
816         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
817                    "overflowed.  Paging active file to disk."),
818              MAX_WORKSPACE / 1024, memory_sink_max_cases,
819              compaction_nval * sizeof (union value));
820
821       free (new_case);
822
823       /* Switch to a disk sink. */
824       vfm_sink = &vfm_disk_stream;
825       vfm_sink->init ();
826       paging = 1;
827
828       /* Terminate the list. */
829       if (memory_sink_iter)
830         memory_sink_iter->next = NULL;
831
832       /* Write the cases to disk and destroy them.  We can't call
833          vfm->sink->write() because of compaction. */
834       for (cur = memory_sink_cases; cur; cur = next)
835         {
836           next = cur->next;
837           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
838                       disk_sink_file) != 1)
839             {
840               msg (ME, _("An error occurred while attempting to "
841                          "write to a temporary file created as the "
842                          "active file, while paging to disk: %s."),
843                    strerror (errno));
844               err_failure ();
845             }
846           free (cur);
847         }
848
849       /* Write the current case to disk. */
850       vfm_sink->write ();
851     }
852 }
853
854 /* If the data is stored in memory, causes it to be written to disk.
855    To be called only *between* procedure()s, not within them. */
856 void
857 page_to_disk (void)
858 {
859   if (vfm_source == &vfm_memory_stream)
860     {
861       /* Switch to a disk sink. */
862       vfm_sink = &vfm_disk_stream;
863       vfm_sink->init ();
864       paging = 1;
865       
866       /* Write the cases to disk and destroy them.  We can't call
867          vfm->sink->write() because of compaction. */
868       {
869         struct case_list *cur, *next;
870         
871         for (cur = memory_source_cases; cur; cur = next)
872           {
873             next = cur->next;
874             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
875                         disk_sink_file) != 1)
876               {
877                 msg (ME, _("An error occurred while attempting to "
878                            "write to a temporary file created as the "
879                            "active file, while paging to disk: %s."),
880                      strerror (errno));
881                 err_failure ();
882               }
883             free (cur);
884           }
885       }
886       
887       vfm_source = &vfm_disk_stream;
888       vfm_source->mode ();
889
890       vfm_sink = NULL;
891     }
892 }
893
894 /* Switch the memory stream from sink to source mode. */
895 static void
896 memory_stream_mode (void)
897 {
898   /* Terminate the list. */
899   if (memory_sink_iter)
900     memory_sink_iter->next = NULL;
901
902   /* Sink --> source variables. */
903   memory_source_cases = memory_sink_cases;
904   memory_sink_cases = NULL;
905 }
906
907 /* Destroy all memory source data. */
908 static void
909 memory_stream_destroy_source (void)
910 {
911   struct case_list *cur, *next;
912   
913   for (cur = memory_source_cases; cur; cur = next)
914     {
915       next = cur->next;
916       free (cur);
917     }
918   memory_source_cases = NULL;
919 }
920
921 /* Destroy all memory sink data. */
922 static void
923 memory_stream_destroy_sink (void)
924 {
925   struct case_list *cur, *next;
926   
927   for (cur = memory_sink_cases; cur; cur = next)
928     {
929       next = cur->next;
930       free (cur);
931     }
932   memory_sink_cases = NULL;
933 }
934   
935 /* Memory stream. */
936 struct case_stream vfm_memory_stream = 
937   {
938     memory_stream_init,
939     memory_stream_read,
940     memory_stream_write,
941     memory_stream_mode,
942     memory_stream_destroy_source,
943     memory_stream_destroy_sink,
944     "memory",
945   };
946 \f
947 #undef DEBUGGING
948 #include "debug-print.h"
949
950 /* Add temp_case to the lag queue. */
951 static void
952 lag_case (void)
953 {
954   if (lag_count < n_lag)
955     lag_count++;
956   memcpy (lag_queue[lag_head], temp_case, sizeof (union value) * temp_dict->nval);
957   if (++lag_head >= n_lag)
958     lag_head = 0;
959 }
960
961 /* Returns a pointer to the lagged case from N_BEFORE cases before the
962    current one, or NULL if there haven't been that many cases yet. */
963 struct ccase *
964 lagged_case (int n_before)
965 {
966   assert (n_before <= n_lag);
967   if (n_before > lag_count)
968     return NULL;
969   
970   {
971     int index = lag_head - n_before;
972     if (index < 0)
973       index += n_lag;
974     return lag_queue[index];
975   }
976 }
977    
978 /* Transforms temp_case and writes it to the replacement active file
979    if advisable.  Returns nonzero if more cases can be accepted, zero
980    otherwise.  Do not call this function again after it has returned
981    zero once.  */
982 int
983 procedure_write_case (void)
984 {
985   /* Index of current transformation. */
986   int cur_trns;
987
988   /* Return value: whether it's reasonable to write any more cases. */
989   int more_cases = 1;
990
991   debug_printf ((_("transform: ")));
992
993   cur_trns = f_trns;
994   for (;;)
995     {
996       /* Output the case if this is temp_trns. */
997       if (cur_trns == temp_trns)
998         {
999           debug_printf (("REC"));
1000
1001           if (n_lag)
1002             lag_case ();
1003           
1004           vfm_sink_info.ncases++;
1005           vfm_sink->write ();
1006
1007           if (default_dict.N)
1008             more_cases = vfm_sink_info.ncases < default_dict.N;
1009         }
1010
1011       /* Are we done? */
1012       if (cur_trns >= n_trns)
1013         break;
1014       
1015       debug_printf (("$%d", cur_trns));
1016
1017       /* Decide which transformation should come next. */
1018       {
1019         int code;
1020         
1021         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
1022         switch (code)
1023           {
1024           case -1:
1025             /* Next transformation. */
1026             cur_trns++;
1027             break;
1028           case -2:
1029             /* Delete this case. */
1030             goto done;
1031           default:
1032             /* Go to that transformation. */
1033             cur_trns = code;
1034             break;
1035           }
1036       }
1037     }
1038
1039   /* Call the beginning of group function. */
1040   if (!case_count && begin_func != NULL)
1041     begin_func ();
1042
1043   /* Call the procedure if there is one and FILTER and PROCESS IF
1044      don't prohibit it. */
1045   if (proc_func != NULL
1046       && !FILTERED
1047       && (process_if_expr == NULL ||
1048           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
1049     proc_func (temp_case);
1050
1051   case_count++;
1052   
1053 done:
1054   debug_putc ('\n', stdout);
1055   
1056   {
1057     long *lp;
1058
1059     /* This case is finished.  Initialize the variables for the next case. */
1060     for (lp = reinit_sysmis.vec; *lp != -1;)
1061       temp_case->data[*lp++].f = SYSMIS;
1062     for (lp = reinit_blanks.vec; *lp != -1;)
1063       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
1064   }
1065   
1066   /* Return previously determined value. */
1067   return more_cases;
1068 }
1069
1070 /* Appends TRNS to t_trns[], the list of all transformations to be
1071    performed on data as it is read from the active file. */
1072 void
1073 add_transformation (struct trns_header * trns)
1074 {
1075   if (n_trns >= m_trns)
1076     {
1077       m_trns += 16;
1078       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1079     }
1080   t_trns[n_trns] = trns;
1081   trns->index = n_trns++;
1082 }
1083
1084 /* Cancels all active transformations, including any transformations
1085    created by the input program. */
1086 void
1087 cancel_transformations (void)
1088 {
1089   int i;
1090   for (i = 0; i < n_trns; i++)
1091     {
1092       if (t_trns[i]->free)
1093         t_trns[i]->free (t_trns[i]);
1094       free (t_trns[i]);
1095     }
1096   n_trns = f_trns = 0;
1097   if (m_trns > 32)
1098     {
1099       free (t_trns);
1100       m_trns = 0;
1101     }
1102 }
1103
1104 /* Dumps out the values of all the split variables for the case C. */
1105 static void
1106 dump_splits (struct ccase *c)
1107 {
1108   struct variable **iter;
1109   struct tab_table *t;
1110   int i;
1111
1112   t = tab_create (3, default_dict.n_splits + 1, 0);
1113   tab_dim (t, tab_natural_dimensions);
1114   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, default_dict.n_splits);
1115   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, default_dict.n_splits);
1116   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1117   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1118   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1119   for (iter = default_dict.splits, i = 0; *iter; iter++, i++)
1120     {
1121       struct variable *v = *iter;
1122       char temp_buf[80];
1123       char *val_lab;
1124
1125       assert (v->type == NUMERIC || v->type == ALPHA);
1126       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1127       
1128       {
1129         union value val = c->data[v->fv];
1130         if (v->type == ALPHA)
1131           val.c = c->data[v->fv].s;
1132         data_out (temp_buf, &v->print, &val);
1133       }
1134       
1135       temp_buf[v->print.w] = 0;
1136       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1137
1138       val_lab = get_val_lab (v, c->data[v->fv], 0);
1139       if (val_lab)
1140         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1141     }
1142   tab_flags (t, SOMF_NO_TITLE);
1143   tab_submit (t);
1144 }
1145
1146 /* This procfunc is substituted for the user-supplied procfunc when
1147    SPLIT FILE is active.  This function forms a wrapper around that
1148    procfunc by dividing the input into series. */
1149 static int
1150 SPLIT_FILE_procfunc (struct ccase *c)
1151 {
1152   static struct ccase *prev_case;
1153   struct variable **iter;
1154
1155   /* The first case always begins a new series.  We also need to
1156      preserve the values of the case for later comparison. */
1157   if (case_count == 0)
1158     {
1159       if (prev_case)
1160         free (prev_case);
1161       prev_case = xmalloc (vfm_sink_info.case_size);
1162       memcpy (prev_case, c, vfm_sink_info.case_size);
1163
1164       dump_splits (c);
1165       if (virt_begin_func != NULL)
1166         virt_begin_func ();
1167       
1168       return virt_proc_func (c);
1169     }
1170
1171   /* Compare the value of each SPLIT FILE variable to the values on
1172      the previous case. */
1173   for (iter = default_dict.splits; *iter; iter++)
1174     {
1175       struct variable *v = *iter;
1176       
1177       switch (v->type)
1178         {
1179         case NUMERIC:
1180           if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
1181             goto not_equal;
1182           break;
1183         case ALPHA:
1184           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1185             goto not_equal;
1186           break;
1187         default:
1188           assert (0);
1189         }
1190     }
1191   return virt_proc_func (c);
1192   
1193 not_equal:
1194   /* The values of the SPLIT FILE variable are different from the
1195      values on the previous case.  That means that it's time to begin
1196      a new series. */
1197   if (end_func != NULL)
1198     end_func ();
1199   dump_splits (c);
1200   if (virt_begin_func != NULL)
1201     virt_begin_func ();
1202   memcpy (prev_case, c, vfm_sink_info.case_size);
1203   return virt_proc_func (c);
1204 }
1205 \f
1206 /* Case compaction. */
1207
1208 /* Copies case SRC to case DEST, compacting it in the process. */
1209 void
1210 compact_case (struct ccase *dest, const struct ccase *src)
1211 {
1212   int i;
1213   int nval = 0;
1214   
1215   assert (compaction_necessary);
1216
1217   if (temporary == 2)
1218     {
1219       if (dest != compaction_case)
1220         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1221       return;
1222     }
1223
1224   /* Copy all the variables except the scratch variables from SRC to
1225      DEST. */
1226   for (i = 0; i < default_dict.nvar; i++)
1227     {
1228       struct variable *v = default_dict.var[i];
1229       
1230       if (v->name[0] == '#')
1231         continue;
1232
1233       if (v->type == NUMERIC)
1234         dest->data[nval++] = src->data[v->fv];
1235       else
1236         {
1237           int w = DIV_RND_UP (v->width, sizeof (union value));
1238           
1239           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1240           nval += w;
1241         }
1242     }
1243 }
1244
1245 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1246 static void
1247 finish_compaction (void)
1248 {
1249   int copy_index = 0;
1250   int nval = 0;
1251   int i;
1252
1253   for (i = 0; i < default_dict.nvar; i++)
1254     {
1255       struct variable *v = default_dict.var[i];
1256
1257       if (v->name[0] == '#')
1258         {
1259           clear_variable (&default_dict, v);
1260           free (v);
1261           continue;
1262         }
1263
1264       v->fv = nval;
1265       if (v->type == NUMERIC)
1266         nval++;
1267       else
1268         nval += DIV_RND_UP (v->width, sizeof (union value));
1269       
1270       default_dict.var[copy_index++] = v;
1271     }
1272   if (copy_index != default_dict.nvar)
1273     {
1274       default_dict.var = xrealloc (default_dict.var,
1275                                    sizeof *default_dict.var * copy_index);
1276       default_dict.nvar = copy_index;
1277     }
1278 }
1279
1280