Sat Dec 27 16:16:49 2003 Ben Pfaff <blp@gnu.org>
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 /* AIX requires this to be the first thing in the file.  */
21 #include <config.h>
22 #if __GNUC__
23 #define alloca __builtin_alloca
24 #else
25 #if HAVE_ALLOCA_H
26 #include <alloca.h>
27 #else
28 #ifdef _AIX
29 #pragma alloca
30 #else
31 #ifndef alloca                  /* predefined by HP cc +Olibcalls */
32 char *alloca ();
33 #endif
34 #endif
35 #endif
36 #endif
37
38 #include "vfm.h"
39 #include "vfmP.h"
40 #include <assert.h>
41 #include <errno.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #if HAVE_UNISTD_H
45 #include <unistd.h>     /* Required by SunOS4. */
46 #endif
47 #include "alloc.h"
48 #include "approx.h"
49 #include "do-ifP.h"
50 #include "error.h"
51 #include "expr.h"
52 #include "misc.h"
53 #include "random.h"
54 #include "som.h"
55 #include "str.h"
56 #include "tab.h"
57 #include "var.h"
58 #include "value-labels.h"
59
60 /*
61    Virtual File Manager (vfm):
62
63    vfm is used to process data files.  It uses the model that data is
64    read from one stream (the data source), then written to another
65    (the data sink).  The data source is then deleted and the data sink
66    becomes the data source for the next procedure. */
67
68 #include "debug-print.h"
69
70 /* This is used to read from the active file. */
71 struct case_stream *vfm_source;
72
73 /* `value' indexes to initialize to particular values for certain cases. */
74 struct long_vec reinit_sysmis;          /* SYSMIS for every case. */
75 struct long_vec reinit_blanks;          /* Blanks for every case. */
76 struct long_vec init_zero;              /* Zero for first case only. */
77 struct long_vec init_blanks;            /* Blanks for first case only. */
78
79 /* This is used to write to the replacement active file. */
80 struct case_stream *vfm_sink;
81
82 /* Information about the data source. */
83 struct stream_info vfm_source_info;
84
85 /* Information about the data sink. */
86 struct stream_info vfm_sink_info;
87
88 /* Filter variable and  `value' index. */
89 static struct variable *filter_var;
90 static int filter_index;
91
92 #define FILTERED                                                        \
93         (filter_index != -1                                             \
94          && (temp_case->data[filter_index].f == 0.0                     \
95              || temp_case->data[filter_index].f == SYSMIS               \
96              || is_num_user_missing (temp_case->data[filter_index].f,   \
97                                      filter_var)))
98
99 /* Nonzero if the case needs to have values deleted before being
100    stored, zero otherwise. */
101 int compaction_necessary;
102
103 /* Number of values after compaction, or the same as
104    vfm_sink_info.nval, if compaction is not necessary. */
105 int compaction_nval;
106
107 /* Temporary case buffer with enough room for `compaction_nval'
108    `value's. */
109 struct ccase *compaction_case;
110
111 /* Within a session, when paging is turned on, it is never turned back
112    off.  This policy might be too aggressive. */
113 static int paging = 0;
114
115 /* Time at which vfm was last invoked. */
116 time_t last_vfm_invocation;
117
118 /* Functions called during procedure processing. */
119 static int (*proc_func) (struct ccase *);       /* Called for each case. */
120 static int (*virt_proc_func) (struct ccase *);  /* From SPLIT_FILE_procfunc. */
121 static void (*begin_func) (void);       /* Called at beginning of a series. */
122 static void (*virt_begin_func) (void);  /* Called by SPLIT_FILE_procfunc. */
123 static void (*end_func) (void); /* Called after end of a series. */
124 int (*write_case) (void);
125
126 /* Number of cases passed to proc_func(). */
127 static int case_count;
128
129 /* Lag queue. */
130 int n_lag;                      /* Number of cases to lag. */
131 static int lag_count;           /* Number of cases in lag_queue so far. */
132 static int lag_head;            /* Index where next case will be added. */
133 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
134
135 static void open_active_file (void);
136 static void close_active_file (void);
137 static int SPLIT_FILE_procfunc (struct ccase *);
138 static void finish_compaction (void);
139 static void lag_case (void);
140 static int procedure_write_case (void);
141 \f
142 /* Public functions. */
143
144 /* Reads all the cases from the active file, transforms them by the
145    active set of transformations, calls PROCFUNC with CURCASE set to
146    the case and CASENUM set to the case number, and writes them to a
147    new active file.
148
149    Divides the active file into zero or more series of one or more
150    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
151    called after each series. */
152 void
153 procedure (void (*beginfunc) (void),
154            int (*procfunc) (struct ccase *curcase),
155            void (*endfunc) (void))
156 {
157   end_func = endfunc;
158   write_case = procedure_write_case;
159
160   if (dict_get_split_cnt (default_dict) != 0 && procfunc != NULL)
161     {
162       virt_proc_func = procfunc;
163       proc_func = SPLIT_FILE_procfunc;
164       
165       virt_begin_func = beginfunc;
166       begin_func = NULL;
167     } else {
168       begin_func = beginfunc;
169       proc_func = procfunc;
170     }
171
172   last_vfm_invocation = time (NULL);
173
174   open_active_file ();
175   vfm_source->read ();
176   close_active_file ();
177 }
178 \f
179 /* Active file processing support.  Subtly different semantics from
180    procedure(). */
181
182 static int process_active_file_write_case (void);
183
184 /* The casefunc might want us to stop calling it. */
185 static int not_canceled;
186
187 /* Reads all the cases from the active file and passes them one-by-one
188    to CASEFUNC in temp_case.  Before any cases are passed, calls
189    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
190    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
191    file by calling process_active_file_output_case().
192
193    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
194 void
195 process_active_file (void (*beginfunc) (void),
196                      int (*casefunc) (struct ccase *curcase),
197                      void (*endfunc) (void))
198 {
199   proc_func = casefunc;
200   write_case = process_active_file_write_case;
201   not_canceled = 1;
202
203   open_active_file ();
204   beginfunc ();
205   
206   /* There doesn't necessarily need to be an active file. */
207   if (vfm_source)
208     vfm_source->read ();
209   
210   endfunc ();
211   close_active_file ();
212 }
213
214 /* Pass the current case to casefunc. */
215 static int
216 process_active_file_write_case (void)
217 {
218   /* Index of current transformation. */
219   int cur_trns;
220
221   for (cur_trns = f_trns ; cur_trns != temp_trns; )
222     {
223       int code;
224         
225       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
226       switch (code)
227         {
228         case -1:
229           /* Next transformation. */
230           cur_trns++;
231           break;
232         case -2:
233           /* Delete this case. */
234           goto done;
235         default:
236           /* Go to that transformation. */
237           cur_trns = code;
238           break;
239         }
240     }
241
242   if (n_lag)
243     lag_case ();
244           
245   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
246   if (not_canceled
247       && !FILTERED
248       && (process_if_expr == NULL ||
249           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
250     not_canceled = proc_func (temp_case);
251   
252   case_count++;
253   
254  done:
255   {
256     long *lp;
257
258     /* This case is finished.  Initialize the variables for the next case. */
259     for (lp = reinit_sysmis.vec; *lp != -1;)
260       temp_case->data[*lp++].f = SYSMIS;
261     for (lp = reinit_blanks.vec; *lp != -1;)
262       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
263   }
264   
265   return 1;
266 }
267
268 /* Write temp_case to the active file. */
269 void
270 process_active_file_output_case (void)
271 {
272   vfm_sink_info.ncases++;
273   vfm_sink->write ();
274 }
275 \f
276 /* Opening the active file. */
277
278 /* It might be usefully noted that the following several functions are
279    given in the order that they are called by open_active_file(). */
280
281 /* Prepare to write to the replacement active file. */
282 static void
283 prepare_for_writing (void)
284 {
285   /* FIXME: If ALL the conditions listed below hold true, then the
286      replacement active file is guaranteed to be identical to the
287      original active file:
288
289      1. TEMPORARY was the first transformation, OR, there were no
290      transformations at all.
291
292      2. Input is not coming from an input program.
293
294      3. Compaction is not necessary.
295
296      So, in this case, we shouldn't have to replace the active
297      file--it's just a waste of time and space. */
298
299   vfm_sink_info.ncases = 0;
300   vfm_sink_info.nval = dict_get_value_cnt (default_dict);
301   vfm_sink_info.case_size = (sizeof (struct ccase)
302                              + ((dict_get_value_cnt (default_dict) - 1)
303                                 * sizeof (union value)));
304   
305   if (vfm_sink == NULL)
306     {
307       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
308           && !paging)
309         {
310           msg (MW, _("Workspace overflow predicted.  Max workspace is "
311                      "currently set to %d KB (%d cases at %d bytes each).  "
312                      "Paging active file to disk."),
313                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
314                vfm_sink_info.case_size);
315           
316           paging = 1;
317         }
318       
319       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
320     }
321 }
322
323 /* Arrange for compacting the output cases for storage. */
324 static void
325 arrange_compaction (void)
326 {
327   int count_values = 0;
328
329   {
330     int i;
331     
332     /* Count up the number of `value's that will be output. */
333     for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
334       {
335         struct variable *v = dict_get_var (temp_dict, i);
336
337         if (v->name[0] != '#')
338           {
339             assert (v->nv > 0);
340             count_values += v->nv;
341           } 
342       }
343     assert (temporary == 2 || count_values <= dict_get_value_cnt (temp_dict));
344   }
345   
346   /* Compaction is only necessary if the number of `value's to output
347      differs from the number already present. */
348   compaction_nval = count_values;
349   compaction_necessary = (temporary == 2
350                           || count_values != dict_get_value_cnt (temp_dict));
351   
352   if (vfm_sink->init)
353     vfm_sink->init ();
354 }
355
356 /* Prepares the temporary case and compaction case. */
357 static void
358 make_temp_case (void)
359 {
360   temp_case = xmalloc (vfm_sink_info.case_size);
361
362   if (compaction_necessary)
363     compaction_case = xmalloc (sizeof (struct ccase)
364                                + sizeof (union value) * (compaction_nval - 1));
365 }
366
367 #if DEBUGGING
368 /* Returns the name of the variable that owns the index CCASE_INDEX
369    into ccase. */
370 static const char *
371 index_to_varname (int ccase_index)
372 {
373   int i;
374
375   for (i = 0; i < default_dict.nvar; i++)
376     {
377       struct variable *v = default_dict.var[i];
378       
379       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
380         return default_dict.var[i]->name;
381     }
382   return _("<NOVAR>");
383 }
384 #endif
385
386 /* Initializes temp_case from the vectors that say which `value's need
387    to be initialized just once, and which ones need to be
388    re-initialized before every case. */
389 static void
390 vector_initialization (void)
391 {
392   int i;
393   long *lp;
394   
395   /* Just once. */
396   for (i = 0; i < init_zero.n; i++)
397     temp_case->data[init_zero.vec[i]].f = 0.0;
398   for (i = 0; i < init_blanks.n; i++)
399     memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING);
400
401   /* These vectors need to be repeatedly accessed, so we add a
402      sentinel to (hopefully) improve speed. */
403   vec_insert (&reinit_sysmis, -1);
404   vec_insert (&reinit_blanks, -1);
405
406   for (lp = reinit_sysmis.vec; *lp != -1;)
407     temp_case->data[*lp++].f = SYSMIS;
408   for (lp = reinit_blanks.vec; *lp != -1;)
409     memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
410   
411 #if DEBUGGING
412   printf ("vfm: init_zero=");
413   for (i = 0; i < init_zero.n; i++)
414     printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i]));
415   printf (" init_blanks=");
416   for (i = 0; i < init_blanks.n; i++)
417     printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i]));
418   printf (" reinit_sysmis=");
419   for (lp = reinit_sysmis.vec; *lp != -1; lp++)
420     printf ("%s%s", lp != reinit_sysmis.vec ? "," : "",
421             index_to_varname (*lp));
422   printf (" reinit_blanks=");
423   for (lp = reinit_blanks.vec; *lp != -1; lp++)
424     printf ("%s%s", lp != reinit_blanks.vec ? "," : "",
425             index_to_varname (*lp));
426   printf ("\n");
427 #endif
428 }
429
430 /* Sets filter_index to an appropriate value. */
431 static void
432 setup_filter (void)
433 {
434   filter_var = dict_get_filter (default_dict);
435   
436   if (filter_var != NULL)
437     {
438       assert (filter_var->type == NUMERIC);
439       filter_index = filter_var->index;
440     } else {
441       filter_index = -1;
442     }
443 }
444
445 /* Sets all the lag-related variables based on value of n_lag. */
446 static void
447 setup_lag (void)
448 {
449   int i;
450   
451   if (n_lag == 0)
452     return;
453
454   lag_count = 0;
455   lag_head = 0;
456   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
457   for (i = 0; i < n_lag; i++)
458     lag_queue[i] = xmalloc (dict_get_value_cnt (temp_dict)
459                             * sizeof **lag_queue);
460 }
461
462 /* There is a lot of potential confusion in the vfm and related
463    routines over the number of `value's at each stage of the process.
464    Here is each nval count, with explanation, as set up by
465    open_active_file():
466
467    vfm_source_info.nval: Number of `value's in the cases returned by
468    the source stream.  This value turns out not to be very useful, but
469    we maintain it anyway.
470
471    vfm_sink_info.nval: Number of `value's in the cases after all
472    transformations have been performed.  Never less than
473    vfm_source_info.nval.
474
475    temp_dict->nval: Number of `value's in the cases after the
476    transformations leading up to TEMPORARY have been performed.  If
477    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
478    Never less than vfm_sink_info.nval.
479
480    compaction_nval: Number of `value's in the cases after the
481    transformations leading up to TEMPORARY have been performed and the
482    case has been compacted by compact_case(), if compaction is
483    necessary.  This the number of `value's in the cases saved by the
484    sink stream.  (However, note that the cases passed to the sink
485    stream have not yet been compacted.  It is the responsibility of
486    the data sink to call compact_case().)  This may be less than,
487    greater than, or equal to vfm_source_info.nval.  `compaction'
488    becomes the new value of default_dict.nval after the procedure is
489    completed.
490
491    default_dict.nval: This is often an alias for temp_dict->nval.  As
492    such it can really have no separate existence until the procedure
493    is complete.  For this reason it should *not* be referenced inside
494    the execution of a procedure. */
495 /* Makes all preparations for reading from the data source and writing
496    to the data sink. */
497 static void
498 open_active_file (void)
499 {
500   /* Sometimes we want to refer to the dictionary that applies to the
501      data actually written to the sink.  This is either temp_dict or
502      default_dict.  However, if TEMPORARY is not on, then temp_dict
503      does not apply.  So, we can set temp_dict to default_dict in this
504      case. */
505   if (!temporary)
506     {
507       temp_trns = n_trns;
508       temp_dict = default_dict;
509     }
510
511   /* No cases passed to the procedure yet. */
512   case_count = 0;
513
514   /* The rest. */
515   prepare_for_writing ();
516   arrange_compaction ();
517   make_temp_case ();
518   vector_initialization ();
519   discard_ctl_stack ();
520   setup_filter ();
521   setup_lag ();
522
523   /* Debug output. */
524   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
525                  vfm_source->name, vfm_sink->name));
526   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
527                  "temp_dict->nval=%d, compaction_nval=%d, "
528                  "default_dict.nval=%d\n",
529                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
530                  compaction_nval, default_dict.nval));
531 }
532 \f
533 /* Closes the active file. */
534 static void
535 close_active_file (void)
536 {
537   /* Close the current case group. */
538   if (case_count && end_func != NULL)
539     end_func ();
540
541   /* Stop lagging (catch up?). */
542   if (n_lag)
543     {
544       int i;
545       
546       for (i = 0; i < n_lag; i++)
547         free (lag_queue[i]);
548       free (lag_queue);
549       n_lag = 0;
550     }
551   
552   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
553      off TEMPORARY. */
554   if (temporary)
555     {
556       dict_destroy (default_dict);
557       default_dict = temp_dict;
558       temp_dict = NULL;
559     }
560
561   /* Finish compaction. */
562   if (compaction_necessary)
563     finish_compaction ();
564     
565   /* Old data sink --> New data source. */
566   if (vfm_source && vfm_source->destroy_source)
567     vfm_source->destroy_source ();
568   
569   vfm_source = vfm_sink;
570   vfm_source_info.ncases = vfm_sink_info.ncases;
571   vfm_source_info.nval = compaction_nval;
572   vfm_source_info.case_size = (sizeof (struct ccase)
573                                + (compaction_nval - 1) * sizeof (union value));
574   if (vfm_source->mode)
575     vfm_source->mode ();
576
577   /* Old data sink is gone now. */
578   vfm_sink = NULL;
579
580   /* Cancel TEMPORARY. */
581   cancel_temporary ();
582
583   /* Free temporary cases. */
584   free (temp_case);
585   temp_case = NULL;
586
587   free (compaction_case);
588   compaction_case = NULL;
589
590   /* Cancel PROCESS IF. */
591   expr_free (process_if_expr);
592   process_if_expr = NULL;
593
594   /* Cancel FILTER if temporary. */
595   if (filter_var != NULL && !FILTER_before_TEMPORARY)
596     dict_set_filter (default_dict, NULL);
597
598   /* Cancel transformations. */
599   cancel_transformations ();
600
601   /* Clear value-initialization vectors. */
602   vec_clear (&init_zero);
603   vec_clear (&init_blanks);
604   vec_clear (&reinit_sysmis);
605   vec_clear (&reinit_blanks);
606
607   /* Turn off case limiter. */
608   dict_set_case_limit (default_dict, 0);
609
610   /* Clear VECTOR vectors. */
611   dict_clear_vectors (default_dict);
612
613   debug_printf (("vfm: procedure complete\n\n"));
614 }
615 \f
616 /* Disk case stream. */
617
618 /* Associated files. */
619 FILE *disk_source_file;
620 FILE *disk_sink_file;
621
622 /* Initializes the disk sink. */
623 static void
624 disk_stream_init (void)
625 {
626   disk_sink_file = tmpfile ();
627   if (!disk_sink_file)
628     {
629       msg (ME, _("An error occurred attempting to create a temporary "
630                  "file for use as the active file: %s."),
631            strerror (errno));
632       err_failure ();
633     }
634 }
635
636 /* Reads all cases from the disk source and passes them one by one to
637    write_case(). */
638 static void
639 disk_stream_read (void)
640 {
641   int i;
642
643   for (i = 0; i < vfm_source_info.ncases; i++)
644     {
645       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
646         {
647           msg (ME, _("An error occurred while attempting to read from "
648                "a temporary file created for the active file: %s."),
649                strerror (errno));
650           err_failure ();
651           return;
652         }
653
654       if (!write_case ())
655         return;
656     }
657 }
658
659 /* Writes temp_case to the disk sink. */
660 static void
661 disk_stream_write (void)
662 {
663   union value *src_case;
664
665   if (compaction_necessary)
666     {
667       compact_case (compaction_case, temp_case);
668       src_case = (union value *) compaction_case;
669     }
670   else src_case = (union value *) temp_case;
671
672   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
673               disk_sink_file) != 1)
674     {
675       msg (ME, _("An error occurred while attempting to write to a "
676                  "temporary file used as the active file: %s."),
677            strerror (errno));
678       err_failure ();
679     }
680 }
681
682 /* Switches the stream from a sink to a source. */
683 static void
684 disk_stream_mode (void)
685 {
686   /* Rewind the sink. */
687   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
688     {
689       msg (ME, _("An error occurred while attempting to rewind a "
690                  "temporary file used as the active file: %s."),
691            strerror (errno));
692       err_failure ();
693     }
694   
695   /* Sink --> source variables. */
696   disk_source_file = disk_sink_file;
697 }
698
699 /* Destroys the source's internal data. */
700 static void
701 disk_stream_destroy_source (void)
702 {
703   if (disk_source_file)
704     {
705       fclose (disk_source_file);
706       disk_source_file = NULL;
707     }
708 }
709
710 /* Destroys the sink's internal data. */
711 static void
712 disk_stream_destroy_sink (void)
713 {
714   if (disk_sink_file)
715     {
716       fclose (disk_sink_file);
717       disk_sink_file = NULL;
718     }
719 }
720
721 /* Disk stream. */
722 struct case_stream vfm_disk_stream = 
723   {
724     disk_stream_init,
725     disk_stream_read,
726     disk_stream_write,
727     disk_stream_mode,
728     disk_stream_destroy_source,
729     disk_stream_destroy_sink,
730     "disk",
731   };
732 \f
733 /* Memory case stream. */
734
735 /* List of cases stored in the stream. */
736 struct case_list *memory_source_cases;
737 struct case_list *memory_sink_cases;
738
739 /* Current case. */
740 struct case_list *memory_sink_iter;
741
742 /* Maximum number of cases. */
743 int memory_sink_max_cases;
744
745 /* Initializes the memory stream variables for writing. */
746 static void
747 memory_stream_init (void)
748 {
749   memory_sink_cases = NULL;
750   memory_sink_iter = NULL;
751   
752   assert (compaction_nval);
753   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
754 }
755
756 /* Reads the case stream from memory and passes it to write_case(). */
757 static void
758 memory_stream_read (void)
759 {
760   while (memory_source_cases != NULL)
761     {
762       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
763       
764       {
765         struct case_list *current = memory_source_cases;
766         memory_source_cases = memory_source_cases->next;
767         free (current);
768       }
769       
770       if (!write_case ())
771         return;
772     }
773 }
774
775 /* Writes temp_case to the memory stream. */
776 static void
777 memory_stream_write (void)
778 {
779   struct case_list *new_case = malloc (sizeof (struct case_list)
780                                        + ((compaction_nval - 1)
781                                           * sizeof (union value)));
782
783   /* If we've got memory to spare then add it to the linked list. */
784   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
785     {
786       if (compaction_necessary)
787         compact_case (&new_case->c, temp_case);
788       else
789         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
790
791       /* Append case to linked list. */
792       if (memory_sink_cases)
793         memory_sink_iter = memory_sink_iter->next = new_case;
794       else
795         memory_sink_iter = memory_sink_cases = new_case;
796     }
797   else
798     {
799       /* Out of memory.  Write the active file to disk. */
800       struct case_list *cur, *next;
801
802       /* Notify the user. */
803       if (!new_case)
804         msg (MW, _("Virtual memory exhausted.  Paging active file "
805                    "to disk."));
806       else
807         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
808                    "overflowed.  Paging active file to disk."),
809              MAX_WORKSPACE / 1024, memory_sink_max_cases,
810              compaction_nval * sizeof (union value));
811
812       free (new_case);
813
814       /* Switch to a disk sink. */
815       vfm_sink = &vfm_disk_stream;
816       vfm_sink->init ();
817       paging = 1;
818
819       /* Terminate the list. */
820       if (memory_sink_iter)
821         memory_sink_iter->next = NULL;
822
823       /* Write the cases to disk and destroy them.  We can't call
824          vfm->sink->write() because of compaction. */
825       for (cur = memory_sink_cases; cur; cur = next)
826         {
827           next = cur->next;
828           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
829                       disk_sink_file) != 1)
830             {
831               msg (ME, _("An error occurred while attempting to "
832                          "write to a temporary file created as the "
833                          "active file, while paging to disk: %s."),
834                    strerror (errno));
835               err_failure ();
836             }
837           free (cur);
838         }
839
840       /* Write the current case to disk. */
841       vfm_sink->write ();
842     }
843 }
844
845 /* If the data is stored in memory, causes it to be written to disk.
846    To be called only *between* procedure()s, not within them. */
847 void
848 page_to_disk (void)
849 {
850   if (vfm_source == &vfm_memory_stream)
851     {
852       /* Switch to a disk sink. */
853       vfm_sink = &vfm_disk_stream;
854       vfm_sink->init ();
855       paging = 1;
856       
857       /* Write the cases to disk and destroy them.  We can't call
858          vfm->sink->write() because of compaction. */
859       {
860         struct case_list *cur, *next;
861         
862         for (cur = memory_source_cases; cur; cur = next)
863           {
864             next = cur->next;
865             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
866                         disk_sink_file) != 1)
867               {
868                 msg (ME, _("An error occurred while attempting to "
869                            "write to a temporary file created as the "
870                            "active file, while paging to disk: %s."),
871                      strerror (errno));
872                 err_failure ();
873               }
874             free (cur);
875           }
876       }
877       
878       vfm_source = &vfm_disk_stream;
879       vfm_source->mode ();
880
881       vfm_sink = NULL;
882     }
883 }
884
885 /* Switch the memory stream from sink to source mode. */
886 static void
887 memory_stream_mode (void)
888 {
889   /* Terminate the list. */
890   if (memory_sink_iter)
891     memory_sink_iter->next = NULL;
892
893   /* Sink --> source variables. */
894   memory_source_cases = memory_sink_cases;
895   memory_sink_cases = NULL;
896 }
897
898 /* Destroy all memory source data. */
899 static void
900 memory_stream_destroy_source (void)
901 {
902   struct case_list *cur, *next;
903   
904   for (cur = memory_source_cases; cur; cur = next)
905     {
906       next = cur->next;
907       free (cur);
908     }
909   memory_source_cases = NULL;
910 }
911
912 /* Destroy all memory sink data. */
913 static void
914 memory_stream_destroy_sink (void)
915 {
916   struct case_list *cur, *next;
917   
918   for (cur = memory_sink_cases; cur; cur = next)
919     {
920       next = cur->next;
921       free (cur);
922     }
923   memory_sink_cases = NULL;
924 }
925   
926 /* Memory stream. */
927 struct case_stream vfm_memory_stream = 
928   {
929     memory_stream_init,
930     memory_stream_read,
931     memory_stream_write,
932     memory_stream_mode,
933     memory_stream_destroy_source,
934     memory_stream_destroy_sink,
935     "memory",
936   };
937 \f
938 #include "debug-print.h"
939
940 /* Add temp_case to the lag queue. */
941 static void
942 lag_case (void)
943 {
944   if (lag_count < n_lag)
945     lag_count++;
946   memcpy (lag_queue[lag_head], temp_case,
947           sizeof (union value) * dict_get_value_cnt (temp_dict));
948   if (++lag_head >= n_lag)
949     lag_head = 0;
950 }
951
952 /* Returns a pointer to the lagged case from N_BEFORE cases before the
953    current one, or NULL if there haven't been that many cases yet. */
954 struct ccase *
955 lagged_case (int n_before)
956 {
957   assert (n_before <= n_lag);
958   if (n_before > lag_count)
959     return NULL;
960   
961   {
962     int index = lag_head - n_before;
963     if (index < 0)
964       index += n_lag;
965     return lag_queue[index];
966   }
967 }
968    
969 /* Transforms temp_case and writes it to the replacement active file
970    if advisable.  Returns nonzero if more cases can be accepted, zero
971    otherwise.  Do not call this function again after it has returned
972    zero once.  */
973 int
974 procedure_write_case (void)
975 {
976   /* Index of current transformation. */
977   int cur_trns;
978
979   /* Return value: whether it's reasonable to write any more cases. */
980   int more_cases = 1;
981
982   debug_printf ((_("transform: ")));
983
984   cur_trns = f_trns;
985   for (;;)
986     {
987       /* Output the case if this is temp_trns. */
988       if (cur_trns == temp_trns)
989         {
990           debug_printf (("REC"));
991
992           if (n_lag)
993             lag_case ();
994           
995           vfm_sink_info.ncases++;
996           vfm_sink->write ();
997
998           if (dict_get_case_limit (default_dict))
999             more_cases = (vfm_sink_info.ncases
1000                           < dict_get_case_limit (default_dict));
1001         }
1002
1003       /* Are we done? */
1004       if (cur_trns >= n_trns)
1005         break;
1006       
1007       debug_printf (("$%d", cur_trns));
1008
1009       /* Decide which transformation should come next. */
1010       {
1011         int code;
1012         
1013         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
1014         switch (code)
1015           {
1016           case -1:
1017             /* Next transformation. */
1018             cur_trns++;
1019             break;
1020           case -2:
1021             /* Delete this case. */
1022             goto done;
1023           default:
1024             /* Go to that transformation. */
1025             cur_trns = code;
1026             break;
1027           }
1028       }
1029     }
1030
1031   /* Call the beginning of group function. */
1032   if (!case_count && begin_func != NULL)
1033     begin_func ();
1034
1035   /* Call the procedure if there is one and FILTER and PROCESS IF
1036      don't prohibit it. */
1037   if (proc_func != NULL
1038       && !FILTERED
1039       && (process_if_expr == NULL ||
1040           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
1041     proc_func (temp_case);
1042
1043   case_count++;
1044   
1045 done:
1046   debug_putc ('\n', stdout);
1047   
1048   {
1049     long *lp;
1050
1051     /* This case is finished.  Initialize the variables for the next case. */
1052     for (lp = reinit_sysmis.vec; *lp != -1;)
1053       temp_case->data[*lp++].f = SYSMIS;
1054     for (lp = reinit_blanks.vec; *lp != -1;)
1055       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
1056   }
1057   
1058   /* Return previously determined value. */
1059   return more_cases;
1060 }
1061
1062 /* Appends TRNS to t_trns[], the list of all transformations to be
1063    performed on data as it is read from the active file. */
1064 void
1065 add_transformation (struct trns_header * trns)
1066 {
1067   if (n_trns >= m_trns)
1068     {
1069       m_trns += 16;
1070       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1071     }
1072   t_trns[n_trns] = trns;
1073   trns->index = n_trns++;
1074 }
1075
1076 /* Cancels all active transformations, including any transformations
1077    created by the input program. */
1078 void
1079 cancel_transformations (void)
1080 {
1081   int i;
1082   for (i = 0; i < n_trns; i++)
1083     {
1084       if (t_trns[i]->free)
1085         t_trns[i]->free (t_trns[i]);
1086       free (t_trns[i]);
1087     }
1088   n_trns = f_trns = 0;
1089   if (m_trns > 32)
1090     {
1091       free (t_trns);
1092       m_trns = 0;
1093     }
1094 }
1095
1096 /* Dumps out the values of all the split variables for the case C. */
1097 static void
1098 dump_splits (struct ccase *c)
1099 {
1100   struct variable *const *split;
1101   struct tab_table *t;
1102   size_t split_cnt;
1103   int i;
1104
1105   split_cnt = dict_get_split_cnt (default_dict);
1106   t = tab_create (3, split_cnt + 1, 0);
1107   tab_dim (t, tab_natural_dimensions);
1108   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1109   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1110   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1111   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1112   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1113   split = dict_get_split_vars (default_dict);
1114   for (i = 0; i < split_cnt; i++)
1115     {
1116       struct variable *v = split[i];
1117       char temp_buf[80];
1118       const char *val_lab;
1119
1120       assert (v->type == NUMERIC || v->type == ALPHA);
1121       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1122       
1123       {
1124         union value val = c->data[v->fv];
1125         if (v->type == ALPHA)
1126           val.c = c->data[v->fv].s;
1127         data_out (temp_buf, &v->print, &val);
1128       }
1129       
1130       temp_buf[v->print.w] = 0;
1131       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1132
1133       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1134       if (val_lab)
1135         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1136     }
1137   tab_flags (t, SOMF_NO_TITLE);
1138   tab_submit (t);
1139 }
1140
1141 /* This procfunc is substituted for the user-supplied procfunc when
1142    SPLIT FILE is active.  This function forms a wrapper around that
1143    procfunc by dividing the input into series. */
1144 static int
1145 SPLIT_FILE_procfunc (struct ccase *c)
1146 {
1147   static struct ccase *prev_case;
1148   struct variable *const *split;
1149   size_t split_cnt;
1150   size_t i;
1151
1152   /* The first case always begins a new series.  We also need to
1153      preserve the values of the case for later comparison. */
1154   if (case_count == 0)
1155     {
1156       if (prev_case)
1157         free (prev_case);
1158       prev_case = xmalloc (vfm_sink_info.case_size);
1159       memcpy (prev_case, c, vfm_sink_info.case_size);
1160
1161       dump_splits (c);
1162       if (virt_begin_func != NULL)
1163         virt_begin_func ();
1164       
1165       return virt_proc_func (c);
1166     }
1167
1168   /* Compare the value of each SPLIT FILE variable to the values on
1169      the previous case. */
1170   split = dict_get_split_vars (default_dict);
1171   split_cnt = dict_get_split_cnt (default_dict);
1172   for (i = 0; i < split_cnt; i++)
1173     {
1174       struct variable *v = split[i];
1175       
1176       switch (v->type)
1177         {
1178         case NUMERIC:
1179           if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
1180             goto not_equal;
1181           break;
1182         case ALPHA:
1183           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1184             goto not_equal;
1185           break;
1186         default:
1187           assert (0);
1188         }
1189     }
1190   return virt_proc_func (c);
1191   
1192 not_equal:
1193   /* The values of the SPLIT FILE variable are different from the
1194      values on the previous case.  That means that it's time to begin
1195      a new series. */
1196   if (end_func != NULL)
1197     end_func ();
1198   dump_splits (c);
1199   if (virt_begin_func != NULL)
1200     virt_begin_func ();
1201   memcpy (prev_case, c, vfm_sink_info.case_size);
1202   return virt_proc_func (c);
1203 }
1204 \f
1205 /* Case compaction. */
1206
1207 /* Copies case SRC to case DEST, compacting it in the process. */
1208 void
1209 compact_case (struct ccase *dest, const struct ccase *src)
1210 {
1211   int i;
1212   int nval = 0;
1213   size_t var_cnt;
1214   
1215   assert (compaction_necessary);
1216
1217   if (temporary == 2)
1218     {
1219       if (dest != compaction_case)
1220         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1221       return;
1222     }
1223
1224   /* Copy all the variables except the scratch variables from SRC to
1225      DEST. */
1226   var_cnt = dict_get_var_cnt (default_dict);
1227   for (i = 0; i < var_cnt; i++)
1228     {
1229       struct variable *v = dict_get_var (default_dict, i);
1230       
1231       if (v->name[0] == '#')
1232         continue;
1233
1234       if (v->type == NUMERIC)
1235         dest->data[nval++] = src->data[v->fv];
1236       else
1237         {
1238           int w = DIV_RND_UP (v->width, sizeof (union value));
1239           
1240           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1241           nval += w;
1242         }
1243     }
1244 }
1245
1246 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1247 static void
1248 finish_compaction (void)
1249 {
1250   int i;
1251
1252   for (i = 0; i < dict_get_var_cnt (default_dict); )
1253     {
1254       struct variable *v = dict_get_var (default_dict, i);
1255
1256       if (v->name[0] == '#') 
1257         dict_delete_var (default_dict, v);
1258       else
1259         i++;
1260     }
1261   dict_compact_values (default_dict);
1262 }
1263
1264