checkin of 0.3.0
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 /* AIX requires this to be the first thing in the file.  */
21 #include <config.h>
22 #if __GNUC__
23 #define alloca __builtin_alloca
24 #else
25 #if HAVE_ALLOCA_H
26 #include <alloca.h>
27 #else
28 #ifdef _AIX
29 #pragma alloca
30 #else
31 #ifndef alloca                  /* predefined by HP cc +Olibcalls */
32 char *alloca ();
33 #endif
34 #endif
35 #endif
36 #endif
37
38 #include <assert.h>
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #if HAVE_UNISTD_H
43 #include <unistd.h>     /* Required by SunOS4. */
44 #endif
45 #include "alloc.h"
46 #include "approx.h"
47 #include "do-ifP.h"
48 #include "error.h"
49 #include "expr.h"
50 #include "misc.h"
51 #include "random.h"
52 #include "som.h"
53 #include "str.h"
54 #include "tab.h"
55 #include "var.h"
56 #include "vector.h"
57 #include "vfm.h"
58 #include "vfmP.h"
59
60 /*
61    Virtual File Manager (vfm):
62
63    vfm is used to process data files.  It uses the model that data is
64    read from one stream (the data source), then written to another
65    (the data sink).  The data source is then deleted and the data sink
66    becomes the data source for the next procedure. */
67
68 #undef DEBUGGING
69 /*#define DEBUGGING 1 */
70 #include "debug-print.h"
71
72 /* This is used to read from the active file. */
73 struct case_stream *vfm_source;
74
75 /* `value' indexes to initialize to particular values for certain cases. */
76 struct long_vec reinit_sysmis;          /* SYSMIS for every case. */
77 struct long_vec reinit_blanks;          /* Blanks for every case. */
78 struct long_vec init_zero;              /* Zero for first case only. */
79 struct long_vec init_blanks;            /* Blanks for first case only. */
80
81 /* This is used to write to the replacement active file. */
82 struct case_stream *vfm_sink;
83
84 /* Information about the data source. */
85 struct stream_info vfm_source_info;
86
87 /* Information about the data sink. */
88 struct stream_info vfm_sink_info;
89
90 /* Filter variable and  `value' index. */
91 static struct variable *filter_var;
92 static int filter_index;
93
94 #define FILTERED                                                        \
95         (filter_index != -1                                             \
96          && (temp_case->data[filter_index].f == 0.0                     \
97              || temp_case->data[filter_index].f == SYSMIS               \
98              || is_num_user_missing (temp_case->data[filter_index].f,   \
99                                      filter_var)))
100
101 /* Nonzero if the case needs to have values deleted before being
102    stored, zero otherwise. */
103 int compaction_necessary;
104
105 /* Number of values after compaction, or the same as
106    vfm_sink_info.nval, if compaction is not necessary. */
107 int compaction_nval;
108
109 /* Temporary case buffer with enough room for `compaction_nval'
110    `value's. */
111 struct ccase *compaction_case;
112
113 /* Within a session, when paging is turned on, it is never turned back
114    off.  This policy might be too aggressive. */
115 static int paging = 0;
116
117 /* Time at which vfm was last invoked. */
118 time_t last_vfm_invocation;
119
120 /* Functions called during procedure processing. */
121 static int (*proc_func) (struct ccase *);       /* Called for each case. */
122 static int (*virt_proc_func) (struct ccase *);  /* From SPLIT_FILE_procfunc. */
123 static void (*begin_func) (void);       /* Called at beginning of a series. */
124 static void (*virt_begin_func) (void);  /* Called by SPLIT_FILE_procfunc. */
125 static void (*end_func) (void); /* Called after end of a series. */
126 int (*write_case) (void);
127
128 /* Number of cases passed to proc_func(). */
129 static int case_count;
130
131 /* Lag queue. */
132 int n_lag;                      /* Number of cases to lag. */
133 static int lag_count;           /* Number of cases in lag_queue so far. */
134 static int lag_head;            /* Index where next case will be added. */
135 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
136
137 static void open_active_file (void);
138 static void close_active_file (void);
139 static int SPLIT_FILE_procfunc (struct ccase *);
140 static void finish_compaction (void);
141 static void lag_case (void);
142 static int procedure_write_case (void);
143 \f
144 /* Public functions. */
145
146 /* Reads all the cases from the active file, transforms them by the
147    active set of transformations, calls PROCFUNC with CURCASE set to
148    the case and CASENUM set to the case number, and writes them to a
149    new active file.
150
151    Divides the active file into zero or more series of one or more
152    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
153    called after each series. */
154 void
155 procedure (void (*beginfunc) (void),
156            int (*procfunc) (struct ccase *curcase),
157            void (*endfunc) (void))
158 {
159   end_func = endfunc;
160   write_case = procedure_write_case;
161
162   if (default_dict.n_splits && procfunc != NULL)
163     {
164       virt_proc_func = procfunc;
165       proc_func = SPLIT_FILE_procfunc;
166       
167       virt_begin_func = beginfunc;
168       begin_func = NULL;
169     } else {
170       begin_func = beginfunc;
171       proc_func = procfunc;
172     }
173
174   last_vfm_invocation = time (NULL);
175
176   open_active_file ();
177   vfm_source->read ();
178   close_active_file ();
179 }
180 \f
181 /* Active file processing support.  Subtly different semantics from
182    procedure(). */
183
184 static int process_active_file_write_case (void);
185
186 /* The casefunc might want us to stop calling it. */
187 static int not_canceled;
188
189 /* Reads all the cases from the active file and passes them one-by-one
190    to CASEFUNC in temp_case.  Before any cases are passed, calls
191    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
192    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
193    file by calling process_active_file_output_case().
194
195    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
196 void
197 process_active_file (void (*beginfunc) (void),
198                      int (*casefunc) (struct ccase *curcase),
199                      void (*endfunc) (void))
200 {
201   proc_func = casefunc;
202   write_case = process_active_file_write_case;
203   not_canceled = 1;
204
205   open_active_file ();
206   beginfunc ();
207   
208   /* There doesn't necessarily need to be an active file. */
209   if (vfm_source)
210     vfm_source->read ();
211   
212   endfunc ();
213   close_active_file ();
214 }
215
216 /* Pass the current case to casefunc. */
217 static int
218 process_active_file_write_case (void)
219 {
220   /* Index of current transformation. */
221   int cur_trns;
222
223   for (cur_trns = f_trns ; cur_trns != temp_trns; )
224     {
225       int code;
226         
227       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
228       switch (code)
229         {
230         case -1:
231           /* Next transformation. */
232           cur_trns++;
233           break;
234         case -2:
235           /* Delete this case. */
236           goto done;
237         default:
238           /* Go to that transformation. */
239           cur_trns = code;
240           break;
241         }
242     }
243
244   if (n_lag)
245     lag_case ();
246           
247   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
248   if (not_canceled
249       && !FILTERED
250       && (process_if_expr == NULL ||
251           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
252     not_canceled = proc_func (temp_case);
253   
254   case_count++;
255   
256  done:
257   {
258     long *lp;
259
260     /* This case is finished.  Initialize the variables for the next case. */
261     for (lp = reinit_sysmis.vec; *lp != -1;)
262       temp_case->data[*lp++].f = SYSMIS;
263     for (lp = reinit_blanks.vec; *lp != -1;)
264       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
265   }
266   
267   return 1;
268 }
269
270 /* Write temp_case to the active file. */
271 void
272 process_active_file_output_case (void)
273 {
274   vfm_sink_info.ncases++;
275   vfm_sink->write ();
276 }
277 \f
278 /* Opening the active file. */
279
280 /* It might be usefully noted that the following several functions are
281    given in the order that they are called by open_active_file(). */
282
283 /* Prepare to write to the replacement active file. */
284 static void
285 prepare_for_writing (void)
286 {
287   /* FIXME: If ALL the conditions listed below hold true, then the
288      replacement active file is guaranteed to be identical to the
289      original active file:
290
291      1. TEMPORARY was the first transformation, OR, there were no
292      transformations at all.
293
294      2. Input is not coming from an input program.
295
296      3. Compaction is not necessary.
297
298      So, in this case, we shouldn't have to replace the active
299      file--it's just a waste of time and space. */
300
301   vfm_sink_info.ncases = 0;
302   vfm_sink_info.nval = default_dict.nval;
303   vfm_sink_info.case_size = (sizeof (struct ccase)
304                              + (default_dict.nval - 1) * sizeof (union value));
305   
306   if (vfm_sink == NULL)
307     {
308       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
309           && !paging)
310         {
311           msg (MW, _("Workspace overflow predicted.  Max workspace is "
312                      "currently set to %d KB (%d cases at %d bytes each).  "
313                      "Paging active file to disk."),
314                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
315                vfm_sink_info.case_size);
316           
317           paging = 1;
318         }
319       
320       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
321     }
322 }
323
324 /* Arrange for compacting the output cases for storage. */
325 static void
326 arrange_compaction (void)
327 {
328   int count_values = 0;
329
330   {
331     int i;
332     
333     /* Count up the number of `value's that will be output. */
334     for (i = 0; i < temp_dict->nvar; i++)
335       if (temp_dict->var[i]->name[0] != '#')
336         {
337           assert (temp_dict->var[i]->nv > 0);
338           count_values += temp_dict->var[i]->nv;
339         }
340     assert (temporary == 2 || count_values <= temp_dict->nval);
341   }
342   
343   /* Compaction is only necessary if the number of `value's to output
344      differs from the number already present. */
345   compaction_nval = count_values;
346   compaction_necessary = temporary == 2 || count_values != temp_dict->nval;
347   
348   if (vfm_sink->init)
349     vfm_sink->init ();
350 }
351
352 /* Prepares the temporary case and compaction case. */
353 static void
354 make_temp_case (void)
355 {
356   temp_case = xmalloc (vfm_sink_info.case_size);
357
358   if (compaction_necessary)
359     compaction_case = xmalloc (sizeof (struct ccase)
360                                + sizeof (union value) * (compaction_nval - 1));
361   
362 #if __CHECKER__
363   /* Initialize the unused trailing parts of string variables to avoid
364      spurious warnings from Checker. */
365   {
366     int i;
367     
368     for (i = 0; i < default_dict.nvar; i++)
369       {
370         struct variable *v = default_dict.var[i];
371       
372         if (v->type == ALPHA && v->width % 8 != 0)
373           memcpy (&temp_case->data[v->fv + v->nv - 1]
374                   .s[v->width % 8], _("!ERROR!"), 8 - v->width % 8);
375       }
376   }
377 #endif
378 }
379
380 #if DEBUGGING
381 /* Returns the name of the variable that owns the index CCASE_INDEX
382    into ccase. */
383 static const char *
384 index_to_varname (int ccase_index)
385 {
386   int i;
387
388   for (i = 0; i < default_dict.nvar; i++)
389     {
390       variable *v = default_dict.var[i];
391       
392       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
393         return default_dict.var[i]->name;
394     }
395   return _("<NOVAR>");
396 }
397 #endif
398
399 /* Initializes temp_case from the vectors that say which `value's need
400    to be initialized just once, and which ones need to be
401    re-initialized before every case. */
402 static void
403 vector_initialization (void)
404 {
405   int i;
406   long *lp;
407   
408   /* Just once. */
409   for (i = 0; i < init_zero.n; i++)
410     temp_case->data[init_zero.vec[i]].f = 0.0;
411   for (i = 0; i < init_blanks.n; i++)
412     memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING);
413
414   /* These vectors need to be repeatedly accessed, so we add a
415      sentinel to (hopefully) improve speed. */
416   vec_insert (&reinit_sysmis, -1);
417   vec_insert (&reinit_blanks, -1);
418
419   for (lp = reinit_sysmis.vec; *lp != -1;)
420     temp_case->data[*lp++].f = SYSMIS;
421   for (lp = reinit_blanks.vec; *lp != -1;)
422     memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
423   
424 #if DEBUGGING
425   printf ("vfm: init_zero=");
426   for (i = 0; i < init_zero.n; i++)
427     printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i]));
428   printf (" init_blanks=");
429   for (i = 0; i < init_blanks.n; i++)
430     printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i]));
431   printf (" reinit_sysmis=");
432   for (lp = reinit_sysmis.vec; *lp != -1; lp++)
433     printf ("%s%s", lp != reinit_sysmis.vec ? "," : "",
434             index_to_varname (*lp));
435   printf (" reinit_blanks=");
436   for (lp = reinit_blanks.vec; *lp != -1; lp++)
437     printf ("%s%s", lp != reinit_blanks.vec ? "," : "",
438             index_to_varname (*lp));
439   printf ("\n");
440 #endif
441 }
442
443 /* Sets filter_index to an appropriate value. */
444 static void
445 setup_filter (void)
446 {
447   filter_index = -1;
448   
449   if (default_dict.filter_var[0])
450     {
451       struct variable *fv = find_variable (default_dict.filter_var);
452       
453       if (fv == NULL || fv->type == ALPHA)
454         default_dict.filter_var[0] = 0;
455       else
456         {
457           filter_index = fv->index;
458           filter_var = fv;
459         }
460     }
461 }
462
463 /* Sets all the lag-related variables based on value of n_lag. */
464 static void
465 setup_lag (void)
466 {
467   int i;
468   
469   if (n_lag == 0)
470     return;
471
472   lag_count = 0;
473   lag_head = 0;
474   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
475   for (i = 0; i < n_lag; i++)
476     lag_queue[i] = xmalloc (temp_dict->nval * sizeof **lag_queue);
477 }
478
479 /* There is a lot of potential confusion in the vfm and related
480    routines over the number of `value's at each stage of the process.
481    Here is each nval count, with explanation, as set up by
482    open_active_file():
483
484    vfm_source_info.nval: Number of `value's in the cases returned by
485    the source stream.  This value turns out not to be very useful, but
486    we maintain it anyway.
487
488    vfm_sink_info.nval: Number of `value's in the cases after all
489    transformations have been performed.  Never less than
490    vfm_source_info.nval.
491
492    temp_dict->nval: Number of `value's in the cases after the
493    transformations leading up to TEMPORARY have been performed.  If
494    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
495    Never less than vfm_sink_info.nval.
496
497    compaction_nval: Number of `value's in the cases after the
498    transformations leading up to TEMPORARY have been performed and the
499    case has been compacted by compact_case(), if compaction is
500    necessary.  This the number of `value's in the cases saved by the
501    sink stream.  (However, note that the cases passed to the sink
502    stream have not yet been compacted.  It is the responsibility of
503    the data sink to call compact_case().)  This may be less than,
504    greater than, or equal to vfm_source_info.nval.  `compaction'
505    becomes the new value of default_dict.nval after the procedure is
506    completed.
507
508    default_dict.nval: This is often an alias for temp_dict->nval.  As
509    such it can really have no separate existence until the procedure
510    is complete.  For this reason it should *not* be referenced inside
511    the execution of a procedure. */
512 /* Makes all preparations for reading from the data source and writing
513    to the data sink. */
514 static void
515 open_active_file (void)
516 {
517   /* Sometimes we want to refer to the dictionary that applies to the
518      data actually written to the sink.  This is either temp_dict or
519      default_dict.  However, if TEMPORARY is not on, then temp_dict
520      does not apply.  So, we can set temp_dict to default_dict in this
521      case. */
522   if (!temporary)
523     {
524       temp_trns = n_trns;
525       temp_dict = &default_dict;
526     }
527
528   /* No cases passed to the procedure yet. */
529   case_count = 0;
530
531   /* The rest. */
532   prepare_for_writing ();
533   arrange_compaction ();
534   make_temp_case ();
535   vector_initialization ();
536   setup_randomize ();
537   discard_ctl_stack ();
538   setup_filter ();
539   setup_lag ();
540
541   /* Debug output. */
542   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
543                  vfm_source->name, vfm_sink->name));
544   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
545                  "temp_dict->nval=%d, compaction_nval=%d, "
546                  "default_dict.nval=%d\n",
547                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
548                  compaction_nval, default_dict.nval));
549 }
550 \f
551 /* Closes the active file. */
552 static void
553 close_active_file (void)
554 {
555   /* Close the current case group. */
556   if (case_count && end_func != NULL)
557     end_func ();
558
559   /* Stop lagging (catch up?). */
560   if (n_lag)
561     {
562       int i;
563       
564       for (i = 0; i < n_lag; i++)
565         free (lag_queue[i]);
566       free (lag_queue);
567       n_lag = 0;
568     }
569   
570   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
571      off TEMPORARY. */
572   if (temporary)
573     {
574       restore_dictionary (temp_dict);
575       temp_dict = NULL;
576     }
577
578   /* The default dictionary assumes the compacted data size. */
579   default_dict.nval = compaction_nval;
580     
581   /* Old data sink --> New data source. */
582   if (vfm_source && vfm_source->destroy_source)
583     vfm_source->destroy_source ();
584   
585   vfm_source = vfm_sink;
586   vfm_source_info.ncases = vfm_sink_info.ncases;
587   vfm_source_info.nval = compaction_nval;
588   vfm_source_info.case_size = (sizeof (struct ccase)
589                                + (compaction_nval - 1) * sizeof (union value));
590   if (vfm_source->mode)
591     vfm_source->mode ();
592
593   /* Old data sink is gone now. */
594   vfm_sink = NULL;
595
596   /* Finish compaction. */
597   if (compaction_necessary)
598     finish_compaction ();
599   cancel_temporary ();
600
601   /* Free temporary cases. */
602   free (temp_case);
603   temp_case = NULL;
604
605   free (compaction_case);
606   compaction_case = NULL;
607
608   /* Cancel PROCESS IF. */
609   expr_free (process_if_expr);
610   process_if_expr = NULL;
611
612   /* Cancel FILTER if temporary. */
613   if (filter_index != -1 && !FILTER_before_TEMPORARY)
614     default_dict.filter_var[0] = 0;
615
616   /* Cancel transformations. */
617   cancel_transformations ();
618
619   /* Clear value-initialization vectors. */
620   vec_clear (&init_zero);
621   vec_clear (&init_blanks);
622   vec_clear (&reinit_sysmis);
623   vec_clear (&reinit_blanks);
624
625   /* Turn off case limiter. */
626   default_dict.N = 0;
627
628   /* Clear VECTOR vectors. */
629   {
630     int i;
631
632     for (i = 0; i < nvec; i++)
633       free (vec[i].v);
634     free (vec);
635     vec = NULL;
636     nvec = 0;
637   }
638
639   debug_printf (("vfm: procedure complete\n\n"));
640 }
641 \f
642 /* Disk case stream. */
643
644 /* Associated files. */
645 FILE *disk_source_file;
646 FILE *disk_sink_file;
647
648 /* Initializes the disk sink. */
649 static void
650 disk_stream_init (void)
651 {
652   disk_sink_file = tmpfile ();
653   if (!disk_sink_file)
654     {
655       msg (ME, _("An error occurred attempting to create a temporary "
656                  "file for use as the active file: %s."),
657            strerror (errno));
658       err_failure ();
659     }
660 }
661
662 /* Reads all cases from the disk source and passes them one by one to
663    write_case(). */
664 static void
665 disk_stream_read (void)
666 {
667   int i;
668
669   for (i = 0; i < vfm_source_info.ncases; i++)
670     {
671       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
672         {
673           msg (ME, _("An error occurred while attempting to read from "
674                "a temporary file created for the active file: %s."),
675                strerror (errno));
676           err_failure ();
677           return;
678         }
679
680       if (!write_case ())
681         return;
682     }
683 }
684
685 /* Writes temp_case to the disk sink. */
686 static void
687 disk_stream_write (void)
688 {
689   union value *src_case;
690
691   if (compaction_necessary)
692     {
693       compact_case (compaction_case, temp_case);
694       src_case = (union value *) compaction_case;
695     }
696   else src_case = (union value *) temp_case;
697
698   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
699               disk_sink_file) != 1)
700     {
701       msg (ME, _("An error occurred while attempting to write to a "
702                  "temporary file used as the active file: %s."),
703            strerror (errno));
704       err_failure ();
705     }
706 }
707
708 /* Switches the stream from a sink to a source. */
709 static void
710 disk_stream_mode (void)
711 {
712   /* Rewind the sink. */
713   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
714     {
715       msg (ME, _("An error occurred while attempting to rewind a "
716                  "temporary file used as the active file: %s."),
717            strerror (errno));
718       err_failure ();
719     }
720   
721   /* Sink --> source variables. */
722   disk_source_file = disk_sink_file;
723 }
724
725 /* Destroys the source's internal data. */
726 static void
727 disk_stream_destroy_source (void)
728 {
729   if (disk_source_file)
730     {
731       fclose (disk_source_file);
732       disk_source_file = NULL;
733     }
734 }
735
736 /* Destroys the sink's internal data. */
737 static void
738 disk_stream_destroy_sink (void)
739 {
740   if (disk_sink_file)
741     {
742       fclose (disk_sink_file);
743       disk_sink_file = NULL;
744     }
745 }
746
747 /* Disk stream. */
748 struct case_stream vfm_disk_stream = 
749   {
750     disk_stream_init,
751     disk_stream_read,
752     disk_stream_write,
753     disk_stream_mode,
754     disk_stream_destroy_source,
755     disk_stream_destroy_sink,
756     "disk",
757   };
758 \f
759 /* Memory case stream. */
760
761 /* List of cases stored in the stream. */
762 struct case_list *memory_source_cases;
763 struct case_list *memory_sink_cases;
764
765 /* Current case. */
766 struct case_list *memory_sink_iter;
767
768 /* Maximum number of cases. */
769 int memory_sink_max_cases;
770
771 /* Initializes the memory stream variables for writing. */
772 static void
773 memory_stream_init (void)
774 {
775   memory_sink_cases = NULL;
776   memory_sink_iter = NULL;
777   
778   assert (compaction_nval);
779   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
780 }
781
782 /* Reads the case stream from memory and passes it to write_case(). */
783 static void
784 memory_stream_read (void)
785 {
786   while (memory_source_cases != NULL)
787     {
788       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
789       
790       {
791         struct case_list *current = memory_source_cases;
792         memory_source_cases = memory_source_cases->next;
793         free (current);
794       }
795       
796       if (!write_case ())
797         return;
798     }
799 }
800
801 /* Writes temp_case to the memory stream. */
802 static void
803 memory_stream_write (void)
804 {
805   struct case_list *new_case = malloc (sizeof (struct case_list)
806                                        + ((compaction_nval - 1)
807                                           * sizeof (union value)));
808
809   /* If we've got memory to spare then add it to the linked list. */
810   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
811     {
812       if (compaction_necessary)
813         compact_case (&new_case->c, temp_case);
814       else
815         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
816
817       /* Append case to linked list. */
818       if (memory_sink_cases)
819         memory_sink_iter = memory_sink_iter->next = new_case;
820       else
821         memory_sink_iter = memory_sink_cases = new_case;
822     }
823   else
824     {
825       /* Out of memory.  Write the active file to disk. */
826       struct case_list *cur, *next;
827
828       /* Notify the user. */
829       if (!new_case)
830         msg (MW, _("Virtual memory exhausted.  Paging active file "
831                    "to disk."));
832       else
833         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
834                    "overflowed.  Paging active file to disk."),
835              MAX_WORKSPACE / 1024, memory_sink_max_cases,
836              compaction_nval * sizeof (union value));
837
838       free (new_case);
839
840       /* Switch to a disk sink. */
841       vfm_sink = &vfm_disk_stream;
842       vfm_sink->init ();
843       paging = 1;
844
845       /* Terminate the list. */
846       if (memory_sink_iter)
847         memory_sink_iter->next = NULL;
848
849       /* Write the cases to disk and destroy them.  We can't call
850          vfm->sink->write() because of compaction. */
851       for (cur = memory_sink_cases; cur; cur = next)
852         {
853           next = cur->next;
854           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
855                       disk_sink_file) != 1)
856             {
857               msg (ME, _("An error occurred while attempting to "
858                          "write to a temporary file created as the "
859                          "active file, while paging to disk: %s."),
860                    strerror (errno));
861               err_failure ();
862             }
863           free (cur);
864         }
865
866       /* Write the current case to disk. */
867       vfm_sink->write ();
868     }
869 }
870
871 /* If the data is stored in memory, causes it to be written to disk.
872    To be called only *between* procedure()s, not within them. */
873 void
874 page_to_disk (void)
875 {
876   if (vfm_source == &vfm_memory_stream)
877     {
878       /* Switch to a disk sink. */
879       vfm_sink = &vfm_disk_stream;
880       vfm_sink->init ();
881       paging = 1;
882       
883       /* Write the cases to disk and destroy them.  We can't call
884          vfm->sink->write() because of compaction. */
885       {
886         struct case_list *cur, *next;
887         
888         for (cur = memory_source_cases; cur; cur = next)
889           {
890             next = cur->next;
891             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
892                         disk_sink_file) != 1)
893               {
894                 msg (ME, _("An error occurred while attempting to "
895                            "write to a temporary file created as the "
896                            "active file, while paging to disk: %s."),
897                      strerror (errno));
898                 err_failure ();
899               }
900             free (cur);
901           }
902       }
903       
904       vfm_source = &vfm_disk_stream;
905       vfm_source->mode ();
906
907       vfm_sink = NULL;
908     }
909 }
910
911 /* Switch the memory stream from sink to source mode. */
912 static void
913 memory_stream_mode (void)
914 {
915   /* Terminate the list. */
916   if (memory_sink_iter)
917     memory_sink_iter->next = NULL;
918
919   /* Sink --> source variables. */
920   memory_source_cases = memory_sink_cases;
921   memory_sink_cases = NULL;
922 }
923
924 /* Destroy all memory source data. */
925 static void
926 memory_stream_destroy_source (void)
927 {
928   struct case_list *cur, *next;
929   
930   for (cur = memory_source_cases; cur; cur = next)
931     {
932       next = cur->next;
933       free (cur);
934     }
935   memory_source_cases = NULL;
936 }
937
938 /* Destroy all memory sink data. */
939 static void
940 memory_stream_destroy_sink (void)
941 {
942   struct case_list *cur, *next;
943   
944   for (cur = memory_sink_cases; cur; cur = next)
945     {
946       next = cur->next;
947       free (cur);
948     }
949   memory_sink_cases = NULL;
950 }
951   
952 /* Memory stream. */
953 struct case_stream vfm_memory_stream = 
954   {
955     memory_stream_init,
956     memory_stream_read,
957     memory_stream_write,
958     memory_stream_mode,
959     memory_stream_destroy_source,
960     memory_stream_destroy_sink,
961     "memory",
962   };
963 \f
964 #undef DEBUGGING
965 #include "debug-print.h"
966
967 /* Add temp_case to the lag queue. */
968 static void
969 lag_case (void)
970 {
971   if (lag_count < n_lag)
972     lag_count++;
973   memcpy (lag_queue[lag_head], temp_case, sizeof (union value) * temp_dict->nval);
974   if (++lag_head >= n_lag)
975     lag_head = 0;
976 }
977
978 /* Returns a pointer to the lagged case from N_BEFORE cases before the
979    current one, or NULL if there haven't been that many cases yet. */
980 struct ccase *
981 lagged_case (int n_before)
982 {
983   assert (n_before <= n_lag);
984   if (n_before > lag_count)
985     return NULL;
986   
987   {
988     int index = lag_head - n_before;
989     if (index < 0)
990       index += n_lag;
991     return lag_queue[index];
992   }
993 }
994    
995 /* Transforms temp_case and writes it to the replacement active file
996    if advisable.  Returns nonzero if more cases can be accepted, zero
997    otherwise.  Do not call this function again after it has returned
998    zero once.  */
999 int
1000 procedure_write_case (void)
1001 {
1002   /* Index of current transformation. */
1003   int cur_trns;
1004
1005   /* Return value: whether it's reasonable to write any more cases. */
1006   int more_cases = 1;
1007
1008   debug_printf ((_("transform: ")));
1009
1010   cur_trns = f_trns;
1011   for (;;)
1012     {
1013       /* Output the case if this is temp_trns. */
1014       if (cur_trns == temp_trns)
1015         {
1016           debug_printf (("REC"));
1017
1018           if (n_lag)
1019             lag_case ();
1020           
1021           vfm_sink_info.ncases++;
1022           vfm_sink->write ();
1023
1024           if (default_dict.N)
1025             more_cases = vfm_sink_info.ncases < default_dict.N;
1026         }
1027
1028       /* Are we done? */
1029       if (cur_trns >= n_trns)
1030         break;
1031       
1032       debug_printf (("$%d", cur_trns));
1033
1034       /* Decide which transformation should come next. */
1035       {
1036         int code;
1037         
1038         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
1039         switch (code)
1040           {
1041           case -1:
1042             /* Next transformation. */
1043             cur_trns++;
1044             break;
1045           case -2:
1046             /* Delete this case. */
1047             goto done;
1048           default:
1049             /* Go to that transformation. */
1050             cur_trns = code;
1051             break;
1052           }
1053       }
1054     }
1055
1056   /* Call the beginning of group function. */
1057   if (!case_count && begin_func != NULL)
1058     begin_func ();
1059
1060   /* Call the procedure if there is one and FILTER and PROCESS IF
1061      don't prohibit it. */
1062   if (proc_func != NULL
1063       && !FILTERED
1064       && (process_if_expr == NULL ||
1065           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
1066     proc_func (temp_case);
1067
1068   case_count++;
1069   
1070 done:
1071   debug_putc ('\n', stdout);
1072   
1073   {
1074     long *lp;
1075
1076     /* This case is finished.  Initialize the variables for the next case. */
1077     for (lp = reinit_sysmis.vec; *lp != -1;)
1078       temp_case->data[*lp++].f = SYSMIS;
1079     for (lp = reinit_blanks.vec; *lp != -1;)
1080       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
1081   }
1082   
1083   /* Return previously determined value. */
1084   return more_cases;
1085 }
1086
1087 /* Appends TRNS to t_trns[], the list of all transformations to be
1088    performed on data as it is read from the active file. */
1089 void
1090 add_transformation (struct trns_header * trns)
1091 {
1092   if (n_trns >= m_trns)
1093     {
1094       m_trns += 16;
1095       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1096     }
1097   t_trns[n_trns] = trns;
1098   trns->index = n_trns++;
1099 }
1100
1101 /* Cancels all active transformations, including any transformations
1102    created by the input program. */
1103 void
1104 cancel_transformations (void)
1105 {
1106   int i;
1107   for (i = 0; i < n_trns; i++)
1108     {
1109       if (t_trns[i]->free)
1110         t_trns[i]->free (t_trns[i]);
1111       free (t_trns[i]);
1112     }
1113   n_trns = f_trns = 0;
1114   if (m_trns > 32)
1115     {
1116       free (t_trns);
1117       m_trns = 0;
1118     }
1119 }
1120
1121 /* Dumps out the values of all the split variables for the case C. */
1122 static void
1123 dump_splits (struct ccase *c)
1124 {
1125   struct variable **iter;
1126   struct tab_table *t;
1127   int i;
1128
1129   t = tab_create (3, default_dict.n_splits + 1, 0);
1130   tab_dim (t, tab_natural_dimensions);
1131   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, default_dict.n_splits);
1132   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, default_dict.n_splits);
1133   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1134   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1135   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1136   for (iter = default_dict.splits, i = 0; *iter; iter++, i++)
1137     {
1138       struct variable *v = *iter;
1139       char temp_buf[80];
1140       char *val_lab;
1141
1142       assert (v->type == NUMERIC || v->type == ALPHA);
1143       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1144       
1145       {
1146         union value val = c->data[v->fv];
1147         if (v->type == ALPHA)
1148           val.c = c->data[v->fv].s;
1149         data_out (temp_buf, &v->print, &val);
1150       }
1151       
1152       temp_buf[v->print.w] = 0;
1153       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1154
1155       val_lab = get_val_lab (v, c->data[v->fv], 0);
1156       if (val_lab)
1157         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1158     }
1159   tab_flags (t, SOMF_NO_TITLE);
1160   tab_submit (t);
1161 }
1162
1163 /* This procfunc is substituted for the user-supplied procfunc when
1164    SPLIT FILE is active.  This function forms a wrapper around that
1165    procfunc by dividing the input into series. */
1166 static int
1167 SPLIT_FILE_procfunc (struct ccase *c)
1168 {
1169   static struct ccase *prev_case;
1170   struct variable **iter;
1171
1172   /* The first case always begins a new series.  We also need to
1173      preserve the values of the case for later comparison. */
1174   if (case_count == 0)
1175     {
1176       if (prev_case)
1177         free (prev_case);
1178       prev_case = xmalloc (vfm_sink_info.case_size);
1179       memcpy (prev_case, c, vfm_sink_info.case_size);
1180
1181       dump_splits (c);
1182       if (virt_begin_func != NULL)
1183         virt_begin_func ();
1184       
1185       return virt_proc_func (c);
1186     }
1187
1188   /* Compare the value of each SPLIT FILE variable to the values on
1189      the previous case. */
1190   for (iter = default_dict.splits; *iter; iter++)
1191     {
1192       struct variable *v = *iter;
1193       
1194       switch (v->type)
1195         {
1196         case NUMERIC:
1197           if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
1198             goto not_equal;
1199           break;
1200         case ALPHA:
1201           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1202             goto not_equal;
1203           break;
1204         default:
1205           assert (0);
1206         }
1207     }
1208   return virt_proc_func (c);
1209   
1210 not_equal:
1211   /* The values of the SPLIT FILE variable are different from the
1212      values on the previous case.  That means that it's time to begin
1213      a new series. */
1214   if (end_func != NULL)
1215     end_func ();
1216   dump_splits (c);
1217   if (virt_begin_func != NULL)
1218     virt_begin_func ();
1219   memcpy (prev_case, c, vfm_sink_info.case_size);
1220   return virt_proc_func (c);
1221 }
1222 \f
1223 /* Case compaction. */
1224
1225 /* Copies case SRC to case DEST, compacting it in the process. */
1226 void
1227 compact_case (struct ccase *dest, const struct ccase *src)
1228 {
1229   int i;
1230   int nval = 0;
1231   
1232   assert (compaction_necessary);
1233
1234   if (temporary == 2)
1235     {
1236       if (dest != compaction_case)
1237         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1238       return;
1239     }
1240
1241   /* Copy all the variables except the scratch variables from SRC to
1242      DEST. */
1243   for (i = 0; i < default_dict.nvar; i++)
1244     {
1245       struct variable *v = default_dict.var[i];
1246       
1247       if (v->name[0] == '#')
1248         continue;
1249
1250       if (v->type == NUMERIC)
1251         dest->data[nval++] = src->data[v->fv];
1252       else
1253         {
1254           int w = DIV_RND_UP (v->width, sizeof (union value));
1255           
1256           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1257           nval += w;
1258         }
1259     }
1260 }
1261
1262 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1263 static void
1264 finish_compaction (void)
1265 {
1266   int copy_index = 0;
1267   int nval = 0;
1268   int i;
1269
1270   for (i = 0; i < default_dict.nvar; i++)
1271     {
1272       struct variable *v = default_dict.var[i];
1273
1274       if (v->name[0] == '#')
1275         {
1276           clear_variable (&default_dict, v);
1277           free (v);
1278           continue;
1279         }
1280
1281       v->fv = nval;
1282       if (v->type == NUMERIC)
1283         nval++;
1284       else
1285         nval += DIV_RND_UP (v->width, sizeof (union value));
1286       
1287       default_dict.var[copy_index++] = v;
1288     }
1289   if (copy_index != default_dict.nvar)
1290     {
1291       default_dict.var = xrealloc (default_dict.var,
1292                                    sizeof *default_dict.var * copy_index);
1293       default_dict.nvar = copy_index;
1294     }
1295 }
1296
1297