Added a --enable-debug option to configure and
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 /* AIX requires this to be the first thing in the file.  */
21 #include <config.h>
22 #if __GNUC__
23 #define alloca __builtin_alloca
24 #else
25 #if HAVE_ALLOCA_H
26 #include <alloca.h>
27 #else
28 #ifdef _AIX
29 #pragma alloca
30 #else
31 #ifndef alloca                  /* predefined by HP cc +Olibcalls */
32 char *alloca ();
33 #endif
34 #endif
35 #endif
36 #endif
37
38 #include <assert.h>
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #if HAVE_UNISTD_H
43 #include <unistd.h>     /* Required by SunOS4. */
44 #endif
45 #include "alloc.h"
46 #include "approx.h"
47 #include "do-ifP.h"
48 #include "error.h"
49 #include "expr.h"
50 #include "misc.h"
51 #include "random.h"
52 #include "som.h"
53 #include "str.h"
54 #include "tab.h"
55 #include "var.h"
56 #include "vector.h"
57 #include "vfm.h"
58 #include "vfmP.h"
59
60 /*
61    Virtual File Manager (vfm):
62
63    vfm is used to process data files.  It uses the model that data is
64    read from one stream (the data source), then written to another
65    (the data sink).  The data source is then deleted and the data sink
66    becomes the data source for the next procedure. */
67
68 #include "debug-print.h"
69
70 /* This is used to read from the active file. */
71 struct case_stream *vfm_source;
72
73 /* `value' indexes to initialize to particular values for certain cases. */
74 struct long_vec reinit_sysmis;          /* SYSMIS for every case. */
75 struct long_vec reinit_blanks;          /* Blanks for every case. */
76 struct long_vec init_zero;              /* Zero for first case only. */
77 struct long_vec init_blanks;            /* Blanks for first case only. */
78
79 /* This is used to write to the replacement active file. */
80 struct case_stream *vfm_sink;
81
82 /* Information about the data source. */
83 struct stream_info vfm_source_info;
84
85 /* Information about the data sink. */
86 struct stream_info vfm_sink_info;
87
88 /* Filter variable and  `value' index. */
89 static struct variable *filter_var;
90 static int filter_index;
91
92 #define FILTERED                                                        \
93         (filter_index != -1                                             \
94          && (temp_case->data[filter_index].f == 0.0                     \
95              || temp_case->data[filter_index].f == SYSMIS               \
96              || is_num_user_missing (temp_case->data[filter_index].f,   \
97                                      filter_var)))
98
99 /* Nonzero if the case needs to have values deleted before being
100    stored, zero otherwise. */
101 int compaction_necessary;
102
103 /* Number of values after compaction, or the same as
104    vfm_sink_info.nval, if compaction is not necessary. */
105 int compaction_nval;
106
107 /* Temporary case buffer with enough room for `compaction_nval'
108    `value's. */
109 struct ccase *compaction_case;
110
111 /* Within a session, when paging is turned on, it is never turned back
112    off.  This policy might be too aggressive. */
113 static int paging = 0;
114
115 /* Time at which vfm was last invoked. */
116 time_t last_vfm_invocation;
117
118 /* Functions called during procedure processing. */
119 static int (*proc_func) (struct ccase *);       /* Called for each case. */
120 static int (*virt_proc_func) (struct ccase *);  /* From SPLIT_FILE_procfunc. */
121 static void (*begin_func) (void);       /* Called at beginning of a series. */
122 static void (*virt_begin_func) (void);  /* Called by SPLIT_FILE_procfunc. */
123 static void (*end_func) (void); /* Called after end of a series. */
124 int (*write_case) (void);
125
126 /* Number of cases passed to proc_func(). */
127 static int case_count;
128
129 /* Lag queue. */
130 int n_lag;                      /* Number of cases to lag. */
131 static int lag_count;           /* Number of cases in lag_queue so far. */
132 static int lag_head;            /* Index where next case will be added. */
133 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
134
135 static void open_active_file (void);
136 static void close_active_file (void);
137 static int SPLIT_FILE_procfunc (struct ccase *);
138 static void finish_compaction (void);
139 static void lag_case (void);
140 static int procedure_write_case (void);
141 \f
142 /* Public functions. */
143
144 /* Reads all the cases from the active file, transforms them by the
145    active set of transformations, calls PROCFUNC with CURCASE set to
146    the case and CASENUM set to the case number, and writes them to a
147    new active file.
148
149    Divides the active file into zero or more series of one or more
150    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
151    called after each series. */
152 void
153 procedure (void (*beginfunc) (void),
154            int (*procfunc) (struct ccase *curcase),
155            void (*endfunc) (void))
156 {
157   end_func = endfunc;
158   write_case = procedure_write_case;
159
160   if (default_dict.n_splits && procfunc != NULL)
161     {
162       virt_proc_func = procfunc;
163       proc_func = SPLIT_FILE_procfunc;
164       
165       virt_begin_func = beginfunc;
166       begin_func = NULL;
167     } else {
168       begin_func = beginfunc;
169       proc_func = procfunc;
170     }
171
172   last_vfm_invocation = time (NULL);
173
174   open_active_file ();
175   vfm_source->read ();
176   close_active_file ();
177 }
178 \f
179 /* Active file processing support.  Subtly different semantics from
180    procedure(). */
181
182 static int process_active_file_write_case (void);
183
184 /* The casefunc might want us to stop calling it. */
185 static int not_canceled;
186
187 /* Reads all the cases from the active file and passes them one-by-one
188    to CASEFUNC in temp_case.  Before any cases are passed, calls
189    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
190    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
191    file by calling process_active_file_output_case().
192
193    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
194 void
195 process_active_file (void (*beginfunc) (void),
196                      int (*casefunc) (struct ccase *curcase),
197                      void (*endfunc) (void))
198 {
199   proc_func = casefunc;
200   write_case = process_active_file_write_case;
201   not_canceled = 1;
202
203   open_active_file ();
204   beginfunc ();
205   
206   /* There doesn't necessarily need to be an active file. */
207   if (vfm_source)
208     vfm_source->read ();
209   
210   endfunc ();
211   close_active_file ();
212 }
213
214 /* Pass the current case to casefunc. */
215 static int
216 process_active_file_write_case (void)
217 {
218   /* Index of current transformation. */
219   int cur_trns;
220
221   for (cur_trns = f_trns ; cur_trns != temp_trns; )
222     {
223       int code;
224         
225       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
226       switch (code)
227         {
228         case -1:
229           /* Next transformation. */
230           cur_trns++;
231           break;
232         case -2:
233           /* Delete this case. */
234           goto done;
235         default:
236           /* Go to that transformation. */
237           cur_trns = code;
238           break;
239         }
240     }
241
242   if (n_lag)
243     lag_case ();
244           
245   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
246   if (not_canceled
247       && !FILTERED
248       && (process_if_expr == NULL ||
249           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
250     not_canceled = proc_func (temp_case);
251   
252   case_count++;
253   
254  done:
255   {
256     long *lp;
257
258     /* This case is finished.  Initialize the variables for the next case. */
259     for (lp = reinit_sysmis.vec; *lp != -1;)
260       temp_case->data[*lp++].f = SYSMIS;
261     for (lp = reinit_blanks.vec; *lp != -1;)
262       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
263   }
264   
265   return 1;
266 }
267
268 /* Write temp_case to the active file. */
269 void
270 process_active_file_output_case (void)
271 {
272   vfm_sink_info.ncases++;
273   vfm_sink->write ();
274 }
275 \f
276 /* Opening the active file. */
277
278 /* It might be usefully noted that the following several functions are
279    given in the order that they are called by open_active_file(). */
280
281 /* Prepare to write to the replacement active file. */
282 static void
283 prepare_for_writing (void)
284 {
285   /* FIXME: If ALL the conditions listed below hold true, then the
286      replacement active file is guaranteed to be identical to the
287      original active file:
288
289      1. TEMPORARY was the first transformation, OR, there were no
290      transformations at all.
291
292      2. Input is not coming from an input program.
293
294      3. Compaction is not necessary.
295
296      So, in this case, we shouldn't have to replace the active
297      file--it's just a waste of time and space. */
298
299   vfm_sink_info.ncases = 0;
300   vfm_sink_info.nval = default_dict.nval;
301   vfm_sink_info.case_size = (sizeof (struct ccase)
302                              + (default_dict.nval - 1) * sizeof (union value));
303   
304   if (vfm_sink == NULL)
305     {
306       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
307           && !paging)
308         {
309           msg (MW, _("Workspace overflow predicted.  Max workspace is "
310                      "currently set to %d KB (%d cases at %d bytes each).  "
311                      "Paging active file to disk."),
312                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
313                vfm_sink_info.case_size);
314           
315           paging = 1;
316         }
317       
318       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
319     }
320 }
321
322 /* Arrange for compacting the output cases for storage. */
323 static void
324 arrange_compaction (void)
325 {
326   int count_values = 0;
327
328   {
329     int i;
330     
331     /* Count up the number of `value's that will be output. */
332     for (i = 0; i < temp_dict->nvar; i++)
333       if (temp_dict->var[i]->name[0] != '#')
334         {
335           assert (temp_dict->var[i]->nv > 0);
336           count_values += temp_dict->var[i]->nv;
337         }
338     assert (temporary == 2 || count_values <= temp_dict->nval);
339   }
340   
341   /* Compaction is only necessary if the number of `value's to output
342      differs from the number already present. */
343   compaction_nval = count_values;
344   compaction_necessary = temporary == 2 || count_values != temp_dict->nval;
345   
346   if (vfm_sink->init)
347     vfm_sink->init ();
348 }
349
350 /* Prepares the temporary case and compaction case. */
351 static void
352 make_temp_case (void)
353 {
354   temp_case = xmalloc (vfm_sink_info.case_size);
355
356   if (compaction_necessary)
357     compaction_case = xmalloc (sizeof (struct ccase)
358                                + sizeof (union value) * (compaction_nval - 1));
359 }
360
361 #if DEBUGGING
362 /* Returns the name of the variable that owns the index CCASE_INDEX
363    into ccase. */
364 static const char *
365 index_to_varname (int ccase_index)
366 {
367   int i;
368
369   for (i = 0; i < default_dict.nvar; i++)
370     {
371       struct variable *v = default_dict.var[i];
372       
373       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
374         return default_dict.var[i]->name;
375     }
376   return _("<NOVAR>");
377 }
378 #endif
379
380 /* Initializes temp_case from the vectors that say which `value's need
381    to be initialized just once, and which ones need to be
382    re-initialized before every case. */
383 static void
384 vector_initialization (void)
385 {
386   int i;
387   long *lp;
388   
389   /* Just once. */
390   for (i = 0; i < init_zero.n; i++)
391     temp_case->data[init_zero.vec[i]].f = 0.0;
392   for (i = 0; i < init_blanks.n; i++)
393     memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING);
394
395   /* These vectors need to be repeatedly accessed, so we add a
396      sentinel to (hopefully) improve speed. */
397   vec_insert (&reinit_sysmis, -1);
398   vec_insert (&reinit_blanks, -1);
399
400   for (lp = reinit_sysmis.vec; *lp != -1;)
401     temp_case->data[*lp++].f = SYSMIS;
402   for (lp = reinit_blanks.vec; *lp != -1;)
403     memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
404   
405 #if DEBUGGING
406   printf ("vfm: init_zero=");
407   for (i = 0; i < init_zero.n; i++)
408     printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i]));
409   printf (" init_blanks=");
410   for (i = 0; i < init_blanks.n; i++)
411     printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i]));
412   printf (" reinit_sysmis=");
413   for (lp = reinit_sysmis.vec; *lp != -1; lp++)
414     printf ("%s%s", lp != reinit_sysmis.vec ? "," : "",
415             index_to_varname (*lp));
416   printf (" reinit_blanks=");
417   for (lp = reinit_blanks.vec; *lp != -1; lp++)
418     printf ("%s%s", lp != reinit_blanks.vec ? "," : "",
419             index_to_varname (*lp));
420   printf ("\n");
421 #endif
422 }
423
424 /* Sets filter_index to an appropriate value. */
425 static void
426 setup_filter (void)
427 {
428   filter_index = -1;
429   
430   if (default_dict.filter_var[0])
431     {
432       struct variable *fv = find_variable (default_dict.filter_var);
433       
434       if (fv == NULL || fv->type == ALPHA)
435         default_dict.filter_var[0] = 0;
436       else
437         {
438           filter_index = fv->index;
439           filter_var = fv;
440         }
441     }
442 }
443
444 /* Sets all the lag-related variables based on value of n_lag. */
445 static void
446 setup_lag (void)
447 {
448   int i;
449   
450   if (n_lag == 0)
451     return;
452
453   lag_count = 0;
454   lag_head = 0;
455   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
456   for (i = 0; i < n_lag; i++)
457     lag_queue[i] = xmalloc (temp_dict->nval * sizeof **lag_queue);
458 }
459
460 /* There is a lot of potential confusion in the vfm and related
461    routines over the number of `value's at each stage of the process.
462    Here is each nval count, with explanation, as set up by
463    open_active_file():
464
465    vfm_source_info.nval: Number of `value's in the cases returned by
466    the source stream.  This value turns out not to be very useful, but
467    we maintain it anyway.
468
469    vfm_sink_info.nval: Number of `value's in the cases after all
470    transformations have been performed.  Never less than
471    vfm_source_info.nval.
472
473    temp_dict->nval: Number of `value's in the cases after the
474    transformations leading up to TEMPORARY have been performed.  If
475    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
476    Never less than vfm_sink_info.nval.
477
478    compaction_nval: Number of `value's in the cases after the
479    transformations leading up to TEMPORARY have been performed and the
480    case has been compacted by compact_case(), if compaction is
481    necessary.  This the number of `value's in the cases saved by the
482    sink stream.  (However, note that the cases passed to the sink
483    stream have not yet been compacted.  It is the responsibility of
484    the data sink to call compact_case().)  This may be less than,
485    greater than, or equal to vfm_source_info.nval.  `compaction'
486    becomes the new value of default_dict.nval after the procedure is
487    completed.
488
489    default_dict.nval: This is often an alias for temp_dict->nval.  As
490    such it can really have no separate existence until the procedure
491    is complete.  For this reason it should *not* be referenced inside
492    the execution of a procedure. */
493 /* Makes all preparations for reading from the data source and writing
494    to the data sink. */
495 static void
496 open_active_file (void)
497 {
498   /* Sometimes we want to refer to the dictionary that applies to the
499      data actually written to the sink.  This is either temp_dict or
500      default_dict.  However, if TEMPORARY is not on, then temp_dict
501      does not apply.  So, we can set temp_dict to default_dict in this
502      case. */
503   if (!temporary)
504     {
505       temp_trns = n_trns;
506       temp_dict = &default_dict;
507     }
508
509   /* No cases passed to the procedure yet. */
510   case_count = 0;
511
512   /* The rest. */
513   prepare_for_writing ();
514   arrange_compaction ();
515   make_temp_case ();
516   vector_initialization ();
517   setup_randomize ();
518   discard_ctl_stack ();
519   setup_filter ();
520   setup_lag ();
521
522   /* Debug output. */
523   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
524                  vfm_source->name, vfm_sink->name));
525   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
526                  "temp_dict->nval=%d, compaction_nval=%d, "
527                  "default_dict.nval=%d\n",
528                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
529                  compaction_nval, default_dict.nval));
530 }
531 \f
532 /* Closes the active file. */
533 static void
534 close_active_file (void)
535 {
536   /* Close the current case group. */
537   if (case_count && end_func != NULL)
538     end_func ();
539
540   /* Stop lagging (catch up?). */
541   if (n_lag)
542     {
543       int i;
544       
545       for (i = 0; i < n_lag; i++)
546         free (lag_queue[i]);
547       free (lag_queue);
548       n_lag = 0;
549     }
550   
551   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
552      off TEMPORARY. */
553   if (temporary)
554     {
555       restore_dictionary (temp_dict);
556       temp_dict = NULL;
557     }
558
559   /* The default dictionary assumes the compacted data size. */
560   default_dict.nval = compaction_nval;
561     
562   /* Old data sink --> New data source. */
563   if (vfm_source && vfm_source->destroy_source)
564     vfm_source->destroy_source ();
565   
566   vfm_source = vfm_sink;
567   vfm_source_info.ncases = vfm_sink_info.ncases;
568   vfm_source_info.nval = compaction_nval;
569   vfm_source_info.case_size = (sizeof (struct ccase)
570                                + (compaction_nval - 1) * sizeof (union value));
571   if (vfm_source->mode)
572     vfm_source->mode ();
573
574   /* Old data sink is gone now. */
575   vfm_sink = NULL;
576
577   /* Finish compaction. */
578   if (compaction_necessary)
579     finish_compaction ();
580   cancel_temporary ();
581
582   /* Free temporary cases. */
583   free (temp_case);
584   temp_case = NULL;
585
586   free (compaction_case);
587   compaction_case = NULL;
588
589   /* Cancel PROCESS IF. */
590   expr_free (process_if_expr);
591   process_if_expr = NULL;
592
593   /* Cancel FILTER if temporary. */
594   if (filter_index != -1 && !FILTER_before_TEMPORARY)
595     default_dict.filter_var[0] = 0;
596
597   /* Cancel transformations. */
598   cancel_transformations ();
599
600   /* Clear value-initialization vectors. */
601   vec_clear (&init_zero);
602   vec_clear (&init_blanks);
603   vec_clear (&reinit_sysmis);
604   vec_clear (&reinit_blanks);
605
606   /* Turn off case limiter. */
607   default_dict.N = 0;
608
609   /* Clear VECTOR vectors. */
610   {
611     int i;
612
613     for (i = 0; i < nvec; i++)
614       free (vec[i].v);
615     free (vec);
616     vec = NULL;
617     nvec = 0;
618   }
619
620   debug_printf (("vfm: procedure complete\n\n"));
621 }
622 \f
623 /* Disk case stream. */
624
625 /* Associated files. */
626 FILE *disk_source_file;
627 FILE *disk_sink_file;
628
629 /* Initializes the disk sink. */
630 static void
631 disk_stream_init (void)
632 {
633   disk_sink_file = tmpfile ();
634   if (!disk_sink_file)
635     {
636       msg (ME, _("An error occurred attempting to create a temporary "
637                  "file for use as the active file: %s."),
638            strerror (errno));
639       err_failure ();
640     }
641 }
642
643 /* Reads all cases from the disk source and passes them one by one to
644    write_case(). */
645 static void
646 disk_stream_read (void)
647 {
648   int i;
649
650   for (i = 0; i < vfm_source_info.ncases; i++)
651     {
652       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
653         {
654           msg (ME, _("An error occurred while attempting to read from "
655                "a temporary file created for the active file: %s."),
656                strerror (errno));
657           err_failure ();
658           return;
659         }
660
661       if (!write_case ())
662         return;
663     }
664 }
665
666 /* Writes temp_case to the disk sink. */
667 static void
668 disk_stream_write (void)
669 {
670   union value *src_case;
671
672   if (compaction_necessary)
673     {
674       compact_case (compaction_case, temp_case);
675       src_case = (union value *) compaction_case;
676     }
677   else src_case = (union value *) temp_case;
678
679   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
680               disk_sink_file) != 1)
681     {
682       msg (ME, _("An error occurred while attempting to write to a "
683                  "temporary file used as the active file: %s."),
684            strerror (errno));
685       err_failure ();
686     }
687 }
688
689 /* Switches the stream from a sink to a source. */
690 static void
691 disk_stream_mode (void)
692 {
693   /* Rewind the sink. */
694   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
695     {
696       msg (ME, _("An error occurred while attempting to rewind a "
697                  "temporary file used as the active file: %s."),
698            strerror (errno));
699       err_failure ();
700     }
701   
702   /* Sink --> source variables. */
703   disk_source_file = disk_sink_file;
704 }
705
706 /* Destroys the source's internal data. */
707 static void
708 disk_stream_destroy_source (void)
709 {
710   if (disk_source_file)
711     {
712       fclose (disk_source_file);
713       disk_source_file = NULL;
714     }
715 }
716
717 /* Destroys the sink's internal data. */
718 static void
719 disk_stream_destroy_sink (void)
720 {
721   if (disk_sink_file)
722     {
723       fclose (disk_sink_file);
724       disk_sink_file = NULL;
725     }
726 }
727
728 /* Disk stream. */
729 struct case_stream vfm_disk_stream = 
730   {
731     disk_stream_init,
732     disk_stream_read,
733     disk_stream_write,
734     disk_stream_mode,
735     disk_stream_destroy_source,
736     disk_stream_destroy_sink,
737     "disk",
738   };
739 \f
740 /* Memory case stream. */
741
742 /* List of cases stored in the stream. */
743 struct case_list *memory_source_cases;
744 struct case_list *memory_sink_cases;
745
746 /* Current case. */
747 struct case_list *memory_sink_iter;
748
749 /* Maximum number of cases. */
750 int memory_sink_max_cases;
751
752 /* Initializes the memory stream variables for writing. */
753 static void
754 memory_stream_init (void)
755 {
756   memory_sink_cases = NULL;
757   memory_sink_iter = NULL;
758   
759   assert (compaction_nval);
760   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
761 }
762
763 /* Reads the case stream from memory and passes it to write_case(). */
764 static void
765 memory_stream_read (void)
766 {
767   while (memory_source_cases != NULL)
768     {
769       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
770       
771       {
772         struct case_list *current = memory_source_cases;
773         memory_source_cases = memory_source_cases->next;
774         free (current);
775       }
776       
777       if (!write_case ())
778         return;
779     }
780 }
781
782 /* Writes temp_case to the memory stream. */
783 static void
784 memory_stream_write (void)
785 {
786   struct case_list *new_case = malloc (sizeof (struct case_list)
787                                        + ((compaction_nval - 1)
788                                           * sizeof (union value)));
789
790   /* If we've got memory to spare then add it to the linked list. */
791   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
792     {
793       if (compaction_necessary)
794         compact_case (&new_case->c, temp_case);
795       else
796         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
797
798       /* Append case to linked list. */
799       if (memory_sink_cases)
800         memory_sink_iter = memory_sink_iter->next = new_case;
801       else
802         memory_sink_iter = memory_sink_cases = new_case;
803     }
804   else
805     {
806       /* Out of memory.  Write the active file to disk. */
807       struct case_list *cur, *next;
808
809       /* Notify the user. */
810       if (!new_case)
811         msg (MW, _("Virtual memory exhausted.  Paging active file "
812                    "to disk."));
813       else
814         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
815                    "overflowed.  Paging active file to disk."),
816              MAX_WORKSPACE / 1024, memory_sink_max_cases,
817              compaction_nval * sizeof (union value));
818
819       free (new_case);
820
821       /* Switch to a disk sink. */
822       vfm_sink = &vfm_disk_stream;
823       vfm_sink->init ();
824       paging = 1;
825
826       /* Terminate the list. */
827       if (memory_sink_iter)
828         memory_sink_iter->next = NULL;
829
830       /* Write the cases to disk and destroy them.  We can't call
831          vfm->sink->write() because of compaction. */
832       for (cur = memory_sink_cases; cur; cur = next)
833         {
834           next = cur->next;
835           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
836                       disk_sink_file) != 1)
837             {
838               msg (ME, _("An error occurred while attempting to "
839                          "write to a temporary file created as the "
840                          "active file, while paging to disk: %s."),
841                    strerror (errno));
842               err_failure ();
843             }
844           free (cur);
845         }
846
847       /* Write the current case to disk. */
848       vfm_sink->write ();
849     }
850 }
851
852 /* If the data is stored in memory, causes it to be written to disk.
853    To be called only *between* procedure()s, not within them. */
854 void
855 page_to_disk (void)
856 {
857   if (vfm_source == &vfm_memory_stream)
858     {
859       /* Switch to a disk sink. */
860       vfm_sink = &vfm_disk_stream;
861       vfm_sink->init ();
862       paging = 1;
863       
864       /* Write the cases to disk and destroy them.  We can't call
865          vfm->sink->write() because of compaction. */
866       {
867         struct case_list *cur, *next;
868         
869         for (cur = memory_source_cases; cur; cur = next)
870           {
871             next = cur->next;
872             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
873                         disk_sink_file) != 1)
874               {
875                 msg (ME, _("An error occurred while attempting to "
876                            "write to a temporary file created as the "
877                            "active file, while paging to disk: %s."),
878                      strerror (errno));
879                 err_failure ();
880               }
881             free (cur);
882           }
883       }
884       
885       vfm_source = &vfm_disk_stream;
886       vfm_source->mode ();
887
888       vfm_sink = NULL;
889     }
890 }
891
892 /* Switch the memory stream from sink to source mode. */
893 static void
894 memory_stream_mode (void)
895 {
896   /* Terminate the list. */
897   if (memory_sink_iter)
898     memory_sink_iter->next = NULL;
899
900   /* Sink --> source variables. */
901   memory_source_cases = memory_sink_cases;
902   memory_sink_cases = NULL;
903 }
904
905 /* Destroy all memory source data. */
906 static void
907 memory_stream_destroy_source (void)
908 {
909   struct case_list *cur, *next;
910   
911   for (cur = memory_source_cases; cur; cur = next)
912     {
913       next = cur->next;
914       free (cur);
915     }
916   memory_source_cases = NULL;
917 }
918
919 /* Destroy all memory sink data. */
920 static void
921 memory_stream_destroy_sink (void)
922 {
923   struct case_list *cur, *next;
924   
925   for (cur = memory_sink_cases; cur; cur = next)
926     {
927       next = cur->next;
928       free (cur);
929     }
930   memory_sink_cases = NULL;
931 }
932   
933 /* Memory stream. */
934 struct case_stream vfm_memory_stream = 
935   {
936     memory_stream_init,
937     memory_stream_read,
938     memory_stream_write,
939     memory_stream_mode,
940     memory_stream_destroy_source,
941     memory_stream_destroy_sink,
942     "memory",
943   };
944 \f
945 #include "debug-print.h"
946
947 /* Add temp_case to the lag queue. */
948 static void
949 lag_case (void)
950 {
951   if (lag_count < n_lag)
952     lag_count++;
953   memcpy (lag_queue[lag_head], temp_case, sizeof (union value) * temp_dict->nval);
954   if (++lag_head >= n_lag)
955     lag_head = 0;
956 }
957
958 /* Returns a pointer to the lagged case from N_BEFORE cases before the
959    current one, or NULL if there haven't been that many cases yet. */
960 struct ccase *
961 lagged_case (int n_before)
962 {
963   assert (n_before <= n_lag);
964   if (n_before > lag_count)
965     return NULL;
966   
967   {
968     int index = lag_head - n_before;
969     if (index < 0)
970       index += n_lag;
971     return lag_queue[index];
972   }
973 }
974    
975 /* Transforms temp_case and writes it to the replacement active file
976    if advisable.  Returns nonzero if more cases can be accepted, zero
977    otherwise.  Do not call this function again after it has returned
978    zero once.  */
979 int
980 procedure_write_case (void)
981 {
982   /* Index of current transformation. */
983   int cur_trns;
984
985   /* Return value: whether it's reasonable to write any more cases. */
986   int more_cases = 1;
987
988   debug_printf ((_("transform: ")));
989
990   cur_trns = f_trns;
991   for (;;)
992     {
993       /* Output the case if this is temp_trns. */
994       if (cur_trns == temp_trns)
995         {
996           debug_printf (("REC"));
997
998           if (n_lag)
999             lag_case ();
1000           
1001           vfm_sink_info.ncases++;
1002           vfm_sink->write ();
1003
1004           if (default_dict.N)
1005             more_cases = vfm_sink_info.ncases < default_dict.N;
1006         }
1007
1008       /* Are we done? */
1009       if (cur_trns >= n_trns)
1010         break;
1011       
1012       debug_printf (("$%d", cur_trns));
1013
1014       /* Decide which transformation should come next. */
1015       {
1016         int code;
1017         
1018         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
1019         switch (code)
1020           {
1021           case -1:
1022             /* Next transformation. */
1023             cur_trns++;
1024             break;
1025           case -2:
1026             /* Delete this case. */
1027             goto done;
1028           default:
1029             /* Go to that transformation. */
1030             cur_trns = code;
1031             break;
1032           }
1033       }
1034     }
1035
1036   /* Call the beginning of group function. */
1037   if (!case_count && begin_func != NULL)
1038     begin_func ();
1039
1040   /* Call the procedure if there is one and FILTER and PROCESS IF
1041      don't prohibit it. */
1042   if (proc_func != NULL
1043       && !FILTERED
1044       && (process_if_expr == NULL ||
1045           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
1046     proc_func (temp_case);
1047
1048   case_count++;
1049   
1050 done:
1051   debug_putc ('\n', stdout);
1052   
1053   {
1054     long *lp;
1055
1056     /* This case is finished.  Initialize the variables for the next case. */
1057     for (lp = reinit_sysmis.vec; *lp != -1;)
1058       temp_case->data[*lp++].f = SYSMIS;
1059     for (lp = reinit_blanks.vec; *lp != -1;)
1060       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
1061   }
1062   
1063   /* Return previously determined value. */
1064   return more_cases;
1065 }
1066
1067 /* Appends TRNS to t_trns[], the list of all transformations to be
1068    performed on data as it is read from the active file. */
1069 void
1070 add_transformation (struct trns_header * trns)
1071 {
1072   if (n_trns >= m_trns)
1073     {
1074       m_trns += 16;
1075       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1076     }
1077   t_trns[n_trns] = trns;
1078   trns->index = n_trns++;
1079 }
1080
1081 /* Cancels all active transformations, including any transformations
1082    created by the input program. */
1083 void
1084 cancel_transformations (void)
1085 {
1086   int i;
1087   for (i = 0; i < n_trns; i++)
1088     {
1089       if (t_trns[i]->free)
1090         t_trns[i]->free (t_trns[i]);
1091       free (t_trns[i]);
1092     }
1093   n_trns = f_trns = 0;
1094   if (m_trns > 32)
1095     {
1096       free (t_trns);
1097       m_trns = 0;
1098     }
1099 }
1100
1101 /* Dumps out the values of all the split variables for the case C. */
1102 static void
1103 dump_splits (struct ccase *c)
1104 {
1105   struct variable **iter;
1106   struct tab_table *t;
1107   int i;
1108
1109   t = tab_create (3, default_dict.n_splits + 1, 0);
1110   tab_dim (t, tab_natural_dimensions);
1111   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, default_dict.n_splits);
1112   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, default_dict.n_splits);
1113   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1114   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1115   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1116   for (iter = default_dict.splits, i = 0; *iter; iter++, i++)
1117     {
1118       struct variable *v = *iter;
1119       char temp_buf[80];
1120       char *val_lab;
1121
1122       assert (v->type == NUMERIC || v->type == ALPHA);
1123       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1124       
1125       {
1126         union value val = c->data[v->fv];
1127         if (v->type == ALPHA)
1128           val.c = c->data[v->fv].s;
1129         data_out (temp_buf, &v->print, &val);
1130       }
1131       
1132       temp_buf[v->print.w] = 0;
1133       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1134
1135       val_lab = get_val_lab (v, c->data[v->fv], 0);
1136       if (val_lab)
1137         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1138     }
1139   tab_flags (t, SOMF_NO_TITLE);
1140   tab_submit (t);
1141 }
1142
1143 /* This procfunc is substituted for the user-supplied procfunc when
1144    SPLIT FILE is active.  This function forms a wrapper around that
1145    procfunc by dividing the input into series. */
1146 static int
1147 SPLIT_FILE_procfunc (struct ccase *c)
1148 {
1149   static struct ccase *prev_case;
1150   struct variable **iter;
1151
1152   /* The first case always begins a new series.  We also need to
1153      preserve the values of the case for later comparison. */
1154   if (case_count == 0)
1155     {
1156       if (prev_case)
1157         free (prev_case);
1158       prev_case = xmalloc (vfm_sink_info.case_size);
1159       memcpy (prev_case, c, vfm_sink_info.case_size);
1160
1161       dump_splits (c);
1162       if (virt_begin_func != NULL)
1163         virt_begin_func ();
1164       
1165       return virt_proc_func (c);
1166     }
1167
1168   /* Compare the value of each SPLIT FILE variable to the values on
1169      the previous case. */
1170   for (iter = default_dict.splits; *iter; iter++)
1171     {
1172       struct variable *v = *iter;
1173       
1174       switch (v->type)
1175         {
1176         case NUMERIC:
1177           if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
1178             goto not_equal;
1179           break;
1180         case ALPHA:
1181           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1182             goto not_equal;
1183           break;
1184         default:
1185           assert (0);
1186         }
1187     }
1188   return virt_proc_func (c);
1189   
1190 not_equal:
1191   /* The values of the SPLIT FILE variable are different from the
1192      values on the previous case.  That means that it's time to begin
1193      a new series. */
1194   if (end_func != NULL)
1195     end_func ();
1196   dump_splits (c);
1197   if (virt_begin_func != NULL)
1198     virt_begin_func ();
1199   memcpy (prev_case, c, vfm_sink_info.case_size);
1200   return virt_proc_func (c);
1201 }
1202 \f
1203 /* Case compaction. */
1204
1205 /* Copies case SRC to case DEST, compacting it in the process. */
1206 void
1207 compact_case (struct ccase *dest, const struct ccase *src)
1208 {
1209   int i;
1210   int nval = 0;
1211   
1212   assert (compaction_necessary);
1213
1214   if (temporary == 2)
1215     {
1216       if (dest != compaction_case)
1217         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1218       return;
1219     }
1220
1221   /* Copy all the variables except the scratch variables from SRC to
1222      DEST. */
1223   for (i = 0; i < default_dict.nvar; i++)
1224     {
1225       struct variable *v = default_dict.var[i];
1226       
1227       if (v->name[0] == '#')
1228         continue;
1229
1230       if (v->type == NUMERIC)
1231         dest->data[nval++] = src->data[v->fv];
1232       else
1233         {
1234           int w = DIV_RND_UP (v->width, sizeof (union value));
1235           
1236           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1237           nval += w;
1238         }
1239     }
1240 }
1241
1242 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1243 static void
1244 finish_compaction (void)
1245 {
1246   int copy_index = 0;
1247   int nval = 0;
1248   int i;
1249
1250   for (i = 0; i < default_dict.nvar; i++)
1251     {
1252       struct variable *v = default_dict.var[i];
1253
1254       if (v->name[0] == '#')
1255         {
1256           clear_variable (&default_dict, v);
1257           free (v);
1258           continue;
1259         }
1260
1261       v->fv = nval;
1262       if (v->type == NUMERIC)
1263         nval++;
1264       else
1265         nval += DIV_RND_UP (v->width, sizeof (union value));
1266       
1267       default_dict.var[copy_index++] = v;
1268     }
1269   if (copy_index != default_dict.nvar)
1270     {
1271       default_dict.var = xrealloc (default_dict.var,
1272                                    sizeof *default_dict.var * copy_index);
1273       default_dict.nvar = copy_index;
1274     }
1275 }
1276
1277