Tue Dec 30 22:42:57 2003 Ben Pfaff <blp@gnu.org>
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "approx.h"
32 #include "do-ifP.h"
33 #include "error.h"
34 #include "expr.h"
35 #include "misc.h"
36 #include "random.h"
37 #include "som.h"
38 #include "str.h"
39 #include "tab.h"
40 #include "var.h"
41 #include "value-labels.h"
42
43 /*
44    Virtual File Manager (vfm):
45
46    vfm is used to process data files.  It uses the model that data is
47    read from one stream (the data source), then written to another
48    (the data sink).  The data source is then deleted and the data sink
49    becomes the data source for the next procedure. */
50
51 #include "debug-print.h"
52
53 /* This is used to read from the active file. */
54 struct case_stream *vfm_source;
55
56 /* `value' indexes to initialize to particular values for certain cases. */
57 struct long_vec reinit_sysmis;          /* SYSMIS for every case. */
58 struct long_vec reinit_blanks;          /* Blanks for every case. */
59 struct long_vec init_zero;              /* Zero for first case only. */
60 struct long_vec init_blanks;            /* Blanks for first case only. */
61
62 /* This is used to write to the replacement active file. */
63 struct case_stream *vfm_sink;
64
65 /* Information about the data source. */
66 struct stream_info vfm_source_info;
67
68 /* Information about the data sink. */
69 struct stream_info vfm_sink_info;
70
71 /* Filter variable and  `value' index. */
72 static struct variable *filter_var;
73 static int filter_index;
74
75 #define FILTERED                                                        \
76         (filter_index != -1                                             \
77          && (temp_case->data[filter_index].f == 0.0                     \
78              || temp_case->data[filter_index].f == SYSMIS               \
79              || is_num_user_missing (temp_case->data[filter_index].f,   \
80                                      filter_var)))
81
82 /* Nonzero if the case needs to have values deleted before being
83    stored, zero otherwise. */
84 int compaction_necessary;
85
86 /* Number of values after compaction, or the same as
87    vfm_sink_info.nval, if compaction is not necessary. */
88 int compaction_nval;
89
90 /* Temporary case buffer with enough room for `compaction_nval'
91    `value's. */
92 struct ccase *compaction_case;
93
94 /* Within a session, when paging is turned on, it is never turned back
95    off.  This policy might be too aggressive. */
96 static int paging = 0;
97
98 /* Time at which vfm was last invoked. */
99 time_t last_vfm_invocation;
100
101 /* Functions called during procedure processing. */
102 static int (*proc_func) (struct ccase *);       /* Called for each case. */
103 static int (*virt_proc_func) (struct ccase *);  /* From SPLIT_FILE_procfunc. */
104 static void (*begin_func) (void);       /* Called at beginning of a series. */
105 static void (*virt_begin_func) (void);  /* Called by SPLIT_FILE_procfunc. */
106 static void (*end_func) (void); /* Called after end of a series. */
107 int (*write_case) (void);
108
109 /* Number of cases passed to proc_func(). */
110 static int case_count;
111
112 /* Lag queue. */
113 int n_lag;                      /* Number of cases to lag. */
114 static int lag_count;           /* Number of cases in lag_queue so far. */
115 static int lag_head;            /* Index where next case will be added. */
116 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
117
118 static void open_active_file (void);
119 static void close_active_file (void);
120 static int SPLIT_FILE_procfunc (struct ccase *);
121 static void finish_compaction (void);
122 static void lag_case (void);
123 static int procedure_write_case (void);
124 \f
125 /* Public functions. */
126
127 /* Reads all the cases from the active file, transforms them by the
128    active set of transformations, calls PROCFUNC with CURCASE set to
129    the case and CASENUM set to the case number, and writes them to a
130    new active file.
131
132    Divides the active file into zero or more series of one or more
133    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
134    called after each series. */
135 void
136 procedure (void (*beginfunc) (void),
137            int (*procfunc) (struct ccase *curcase),
138            void (*endfunc) (void))
139 {
140   end_func = endfunc;
141   write_case = procedure_write_case;
142
143   if (dict_get_split_cnt (default_dict) != 0 && procfunc != NULL)
144     {
145       virt_proc_func = procfunc;
146       proc_func = SPLIT_FILE_procfunc;
147       
148       virt_begin_func = beginfunc;
149       begin_func = NULL;
150     } else {
151       begin_func = beginfunc;
152       proc_func = procfunc;
153     }
154
155   last_vfm_invocation = time (NULL);
156
157   open_active_file ();
158   vfm_source->read ();
159   close_active_file ();
160 }
161 \f
162 /* Active file processing support.  Subtly different semantics from
163    procedure(). */
164
165 static int process_active_file_write_case (void);
166
167 /* The casefunc might want us to stop calling it. */
168 static int not_canceled;
169
170 /* Reads all the cases from the active file and passes them one-by-one
171    to CASEFUNC in temp_case.  Before any cases are passed, calls
172    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
173    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
174    file by calling process_active_file_output_case().
175
176    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
177 void
178 process_active_file (void (*beginfunc) (void),
179                      int (*casefunc) (struct ccase *curcase),
180                      void (*endfunc) (void))
181 {
182   proc_func = casefunc;
183   write_case = process_active_file_write_case;
184   not_canceled = 1;
185
186   open_active_file ();
187   beginfunc ();
188   
189   /* There doesn't necessarily need to be an active file. */
190   if (vfm_source)
191     vfm_source->read ();
192   
193   endfunc ();
194   close_active_file ();
195 }
196
197 /* Pass the current case to casefunc. */
198 static int
199 process_active_file_write_case (void)
200 {
201   /* Index of current transformation. */
202   int cur_trns;
203
204   for (cur_trns = f_trns ; cur_trns != temp_trns; )
205     {
206       int code;
207         
208       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
209       switch (code)
210         {
211         case -1:
212           /* Next transformation. */
213           cur_trns++;
214           break;
215         case -2:
216           /* Delete this case. */
217           goto done;
218         default:
219           /* Go to that transformation. */
220           cur_trns = code;
221           break;
222         }
223     }
224
225   if (n_lag)
226     lag_case ();
227           
228   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
229   if (not_canceled
230       && !FILTERED
231       && (process_if_expr == NULL ||
232           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
233     not_canceled = proc_func (temp_case);
234   
235   case_count++;
236   
237  done:
238   {
239     long *lp;
240
241     /* This case is finished.  Initialize the variables for the next case. */
242     for (lp = reinit_sysmis.vec; *lp != -1;)
243       temp_case->data[*lp++].f = SYSMIS;
244     for (lp = reinit_blanks.vec; *lp != -1;)
245       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
246   }
247   
248   return 1;
249 }
250
251 /* Write temp_case to the active file. */
252 void
253 process_active_file_output_case (void)
254 {
255   vfm_sink_info.ncases++;
256   vfm_sink->write ();
257 }
258 \f
259 /* Opening the active file. */
260
261 /* It might be usefully noted that the following several functions are
262    given in the order that they are called by open_active_file(). */
263
264 /* Prepare to write to the replacement active file. */
265 static void
266 prepare_for_writing (void)
267 {
268   /* FIXME: If ALL the conditions listed below hold true, then the
269      replacement active file is guaranteed to be identical to the
270      original active file:
271
272      1. TEMPORARY was the first transformation, OR, there were no
273      transformations at all.
274
275      2. Input is not coming from an input program.
276
277      3. Compaction is not necessary.
278
279      So, in this case, we shouldn't have to replace the active
280      file--it's just a waste of time and space. */
281
282   vfm_sink_info.ncases = 0;
283   vfm_sink_info.nval = dict_get_value_cnt (default_dict);
284   vfm_sink_info.case_size = (sizeof (struct ccase)
285                              + ((dict_get_value_cnt (default_dict) - 1)
286                                 * sizeof (union value)));
287   
288   if (vfm_sink == NULL)
289     {
290       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
291           && !paging)
292         {
293           msg (MW, _("Workspace overflow predicted.  Max workspace is "
294                      "currently set to %d KB (%d cases at %d bytes each).  "
295                      "Paging active file to disk."),
296                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
297                vfm_sink_info.case_size);
298           
299           paging = 1;
300         }
301       
302       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
303     }
304 }
305
306 /* Arrange for compacting the output cases for storage. */
307 static void
308 arrange_compaction (void)
309 {
310   int count_values = 0;
311
312   {
313     int i;
314     
315     /* Count up the number of `value's that will be output. */
316     for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
317       {
318         struct variable *v = dict_get_var (temp_dict, i);
319
320         if (v->name[0] != '#')
321           {
322             assert (v->nv > 0);
323             count_values += v->nv;
324           } 
325       }
326     assert (temporary == 2 || count_values <= dict_get_value_cnt (temp_dict));
327   }
328   
329   /* Compaction is only necessary if the number of `value's to output
330      differs from the number already present. */
331   compaction_nval = count_values;
332   compaction_necessary = (temporary == 2
333                           || count_values != dict_get_value_cnt (temp_dict));
334   
335   if (vfm_sink->init)
336     vfm_sink->init ();
337 }
338
339 /* Prepares the temporary case and compaction case. */
340 static void
341 make_temp_case (void)
342 {
343   temp_case = xmalloc (vfm_sink_info.case_size);
344
345   if (compaction_necessary)
346     compaction_case = xmalloc (sizeof (struct ccase)
347                                + sizeof (union value) * (compaction_nval - 1));
348 }
349
350 #if DEBUGGING
351 /* Returns the name of the variable that owns the index CCASE_INDEX
352    into ccase. */
353 static const char *
354 index_to_varname (int ccase_index)
355 {
356   int i;
357
358   for (i = 0; i < default_dict.nvar; i++)
359     {
360       struct variable *v = default_dict.var[i];
361       
362       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
363         return default_dict.var[i]->name;
364     }
365   return _("<NOVAR>");
366 }
367 #endif
368
369 /* Initializes temp_case from the vectors that say which `value's need
370    to be initialized just once, and which ones need to be
371    re-initialized before every case. */
372 static void
373 vector_initialization (void)
374 {
375   int i;
376   long *lp;
377   
378   /* Just once. */
379   for (i = 0; i < init_zero.n; i++)
380     temp_case->data[init_zero.vec[i]].f = 0.0;
381   for (i = 0; i < init_blanks.n; i++)
382     memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING);
383
384   /* These vectors need to be repeatedly accessed, so we add a
385      sentinel to (hopefully) improve speed. */
386   vec_insert (&reinit_sysmis, -1);
387   vec_insert (&reinit_blanks, -1);
388
389   for (lp = reinit_sysmis.vec; *lp != -1;)
390     temp_case->data[*lp++].f = SYSMIS;
391   for (lp = reinit_blanks.vec; *lp != -1;)
392     memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
393   
394 #if DEBUGGING
395   printf ("vfm: init_zero=");
396   for (i = 0; i < init_zero.n; i++)
397     printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i]));
398   printf (" init_blanks=");
399   for (i = 0; i < init_blanks.n; i++)
400     printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i]));
401   printf (" reinit_sysmis=");
402   for (lp = reinit_sysmis.vec; *lp != -1; lp++)
403     printf ("%s%s", lp != reinit_sysmis.vec ? "," : "",
404             index_to_varname (*lp));
405   printf (" reinit_blanks=");
406   for (lp = reinit_blanks.vec; *lp != -1; lp++)
407     printf ("%s%s", lp != reinit_blanks.vec ? "," : "",
408             index_to_varname (*lp));
409   printf ("\n");
410 #endif
411 }
412
413 /* Sets filter_index to an appropriate value. */
414 static void
415 setup_filter (void)
416 {
417   filter_var = dict_get_filter (default_dict);
418   
419   if (filter_var != NULL)
420     {
421       assert (filter_var->type == NUMERIC);
422       filter_index = filter_var->index;
423     } else {
424       filter_index = -1;
425     }
426 }
427
428 /* Sets all the lag-related variables based on value of n_lag. */
429 static void
430 setup_lag (void)
431 {
432   int i;
433   
434   if (n_lag == 0)
435     return;
436
437   lag_count = 0;
438   lag_head = 0;
439   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
440   for (i = 0; i < n_lag; i++)
441     lag_queue[i] = xmalloc (dict_get_value_cnt (temp_dict)
442                             * sizeof **lag_queue);
443 }
444
445 /* There is a lot of potential confusion in the vfm and related
446    routines over the number of `value's at each stage of the process.
447    Here is each nval count, with explanation, as set up by
448    open_active_file():
449
450    vfm_source_info.nval: Number of `value's in the cases returned by
451    the source stream.  This value turns out not to be very useful, but
452    we maintain it anyway.
453
454    vfm_sink_info.nval: Number of `value's in the cases after all
455    transformations have been performed.  Never less than
456    vfm_source_info.nval.
457
458    temp_dict->nval: Number of `value's in the cases after the
459    transformations leading up to TEMPORARY have been performed.  If
460    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
461    Never less than vfm_sink_info.nval.
462
463    compaction_nval: Number of `value's in the cases after the
464    transformations leading up to TEMPORARY have been performed and the
465    case has been compacted by compact_case(), if compaction is
466    necessary.  This the number of `value's in the cases saved by the
467    sink stream.  (However, note that the cases passed to the sink
468    stream have not yet been compacted.  It is the responsibility of
469    the data sink to call compact_case().)  This may be less than,
470    greater than, or equal to vfm_source_info.nval.  `compaction'
471    becomes the new value of default_dict.nval after the procedure is
472    completed.
473
474    default_dict.nval: This is often an alias for temp_dict->nval.  As
475    such it can really have no separate existence until the procedure
476    is complete.  For this reason it should *not* be referenced inside
477    the execution of a procedure. */
478 /* Makes all preparations for reading from the data source and writing
479    to the data sink. */
480 static void
481 open_active_file (void)
482 {
483   /* Sometimes we want to refer to the dictionary that applies to the
484      data actually written to the sink.  This is either temp_dict or
485      default_dict.  However, if TEMPORARY is not on, then temp_dict
486      does not apply.  So, we can set temp_dict to default_dict in this
487      case. */
488   if (!temporary)
489     {
490       temp_trns = n_trns;
491       temp_dict = default_dict;
492     }
493
494   /* No cases passed to the procedure yet. */
495   case_count = 0;
496
497   /* The rest. */
498   prepare_for_writing ();
499   arrange_compaction ();
500   make_temp_case ();
501   vector_initialization ();
502   discard_ctl_stack ();
503   setup_filter ();
504   setup_lag ();
505
506   /* Debug output. */
507   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
508                  vfm_source->name, vfm_sink->name));
509   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
510                  "temp_dict->nval=%d, compaction_nval=%d, "
511                  "default_dict.nval=%d\n",
512                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
513                  compaction_nval, default_dict.nval));
514 }
515 \f
516 /* Closes the active file. */
517 static void
518 close_active_file (void)
519 {
520   /* Close the current case group. */
521   if (case_count && end_func != NULL)
522     end_func ();
523
524   /* Stop lagging (catch up?). */
525   if (n_lag)
526     {
527       int i;
528       
529       for (i = 0; i < n_lag; i++)
530         free (lag_queue[i]);
531       free (lag_queue);
532       n_lag = 0;
533     }
534   
535   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
536      off TEMPORARY. */
537   if (temporary)
538     {
539       dict_destroy (default_dict);
540       default_dict = temp_dict;
541       temp_dict = NULL;
542     }
543
544   /* Finish compaction. */
545   if (compaction_necessary)
546     finish_compaction ();
547     
548   /* Old data sink --> New data source. */
549   if (vfm_source && vfm_source->destroy_source)
550     vfm_source->destroy_source ();
551   
552   vfm_source = vfm_sink;
553   vfm_source_info.ncases = vfm_sink_info.ncases;
554   vfm_source_info.nval = compaction_nval;
555   vfm_source_info.case_size = (sizeof (struct ccase)
556                                + (compaction_nval - 1) * sizeof (union value));
557   if (vfm_source->mode)
558     vfm_source->mode ();
559
560   /* Old data sink is gone now. */
561   vfm_sink = NULL;
562
563   /* Cancel TEMPORARY. */
564   cancel_temporary ();
565
566   /* Free temporary cases. */
567   free (temp_case);
568   temp_case = NULL;
569
570   free (compaction_case);
571   compaction_case = NULL;
572
573   /* Cancel PROCESS IF. */
574   expr_free (process_if_expr);
575   process_if_expr = NULL;
576
577   /* Cancel FILTER if temporary. */
578   if (filter_var != NULL && !FILTER_before_TEMPORARY)
579     dict_set_filter (default_dict, NULL);
580
581   /* Cancel transformations. */
582   cancel_transformations ();
583
584   /* Clear value-initialization vectors. */
585   vec_clear (&init_zero);
586   vec_clear (&init_blanks);
587   vec_clear (&reinit_sysmis);
588   vec_clear (&reinit_blanks);
589
590   /* Turn off case limiter. */
591   dict_set_case_limit (default_dict, 0);
592
593   /* Clear VECTOR vectors. */
594   dict_clear_vectors (default_dict);
595
596   debug_printf (("vfm: procedure complete\n\n"));
597 }
598 \f
599 /* Disk case stream. */
600
601 /* Associated files. */
602 FILE *disk_source_file;
603 FILE *disk_sink_file;
604
605 /* Initializes the disk sink. */
606 static void
607 disk_stream_init (void)
608 {
609   disk_sink_file = tmpfile ();
610   if (!disk_sink_file)
611     {
612       msg (ME, _("An error occurred attempting to create a temporary "
613                  "file for use as the active file: %s."),
614            strerror (errno));
615       err_failure ();
616     }
617 }
618
619 /* Reads all cases from the disk source and passes them one by one to
620    write_case(). */
621 static void
622 disk_stream_read (void)
623 {
624   int i;
625
626   for (i = 0; i < vfm_source_info.ncases; i++)
627     {
628       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
629         {
630           msg (ME, _("An error occurred while attempting to read from "
631                "a temporary file created for the active file: %s."),
632                strerror (errno));
633           err_failure ();
634           return;
635         }
636
637       if (!write_case ())
638         return;
639     }
640 }
641
642 /* Writes temp_case to the disk sink. */
643 static void
644 disk_stream_write (void)
645 {
646   union value *src_case;
647
648   if (compaction_necessary)
649     {
650       compact_case (compaction_case, temp_case);
651       src_case = (union value *) compaction_case;
652     }
653   else src_case = (union value *) temp_case;
654
655   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
656               disk_sink_file) != 1)
657     {
658       msg (ME, _("An error occurred while attempting to write to a "
659                  "temporary file used as the active file: %s."),
660            strerror (errno));
661       err_failure ();
662     }
663 }
664
665 /* Switches the stream from a sink to a source. */
666 static void
667 disk_stream_mode (void)
668 {
669   /* Rewind the sink. */
670   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
671     {
672       msg (ME, _("An error occurred while attempting to rewind a "
673                  "temporary file used as the active file: %s."),
674            strerror (errno));
675       err_failure ();
676     }
677   
678   /* Sink --> source variables. */
679   disk_source_file = disk_sink_file;
680 }
681
682 /* Destroys the source's internal data. */
683 static void
684 disk_stream_destroy_source (void)
685 {
686   if (disk_source_file)
687     {
688       fclose (disk_source_file);
689       disk_source_file = NULL;
690     }
691 }
692
693 /* Destroys the sink's internal data. */
694 static void
695 disk_stream_destroy_sink (void)
696 {
697   if (disk_sink_file)
698     {
699       fclose (disk_sink_file);
700       disk_sink_file = NULL;
701     }
702 }
703
704 /* Disk stream. */
705 struct case_stream vfm_disk_stream = 
706   {
707     disk_stream_init,
708     disk_stream_read,
709     disk_stream_write,
710     disk_stream_mode,
711     disk_stream_destroy_source,
712     disk_stream_destroy_sink,
713     "disk",
714   };
715 \f
716 /* Memory case stream. */
717
718 /* List of cases stored in the stream. */
719 struct case_list *memory_source_cases;
720 struct case_list *memory_sink_cases;
721
722 /* Current case. */
723 struct case_list *memory_sink_iter;
724
725 /* Maximum number of cases. */
726 int memory_sink_max_cases;
727
728 /* Initializes the memory stream variables for writing. */
729 static void
730 memory_stream_init (void)
731 {
732   memory_sink_cases = NULL;
733   memory_sink_iter = NULL;
734   
735   assert (compaction_nval);
736   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
737 }
738
739 /* Reads the case stream from memory and passes it to write_case(). */
740 static void
741 memory_stream_read (void)
742 {
743   while (memory_source_cases != NULL)
744     {
745       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
746       
747       {
748         struct case_list *current = memory_source_cases;
749         memory_source_cases = memory_source_cases->next;
750         free (current);
751       }
752       
753       if (!write_case ())
754         return;
755     }
756 }
757
758 /* Writes temp_case to the memory stream. */
759 static void
760 memory_stream_write (void)
761 {
762   struct case_list *new_case = malloc (sizeof (struct case_list)
763                                        + ((compaction_nval - 1)
764                                           * sizeof (union value)));
765
766   /* If we've got memory to spare then add it to the linked list. */
767   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
768     {
769       if (compaction_necessary)
770         compact_case (&new_case->c, temp_case);
771       else
772         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
773
774       /* Append case to linked list. */
775       if (memory_sink_cases)
776         memory_sink_iter = memory_sink_iter->next = new_case;
777       else
778         memory_sink_iter = memory_sink_cases = new_case;
779     }
780   else
781     {
782       /* Out of memory.  Write the active file to disk. */
783       struct case_list *cur, *next;
784
785       /* Notify the user. */
786       if (!new_case)
787         msg (MW, _("Virtual memory exhausted.  Paging active file "
788                    "to disk."));
789       else
790         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
791                    "overflowed.  Paging active file to disk."),
792              MAX_WORKSPACE / 1024, memory_sink_max_cases,
793              compaction_nval * sizeof (union value));
794
795       free (new_case);
796
797       /* Switch to a disk sink. */
798       vfm_sink = &vfm_disk_stream;
799       vfm_sink->init ();
800       paging = 1;
801
802       /* Terminate the list. */
803       if (memory_sink_iter)
804         memory_sink_iter->next = NULL;
805
806       /* Write the cases to disk and destroy them.  We can't call
807          vfm->sink->write() because of compaction. */
808       for (cur = memory_sink_cases; cur; cur = next)
809         {
810           next = cur->next;
811           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
812                       disk_sink_file) != 1)
813             {
814               msg (ME, _("An error occurred while attempting to "
815                          "write to a temporary file created as the "
816                          "active file, while paging to disk: %s."),
817                    strerror (errno));
818               err_failure ();
819             }
820           free (cur);
821         }
822
823       /* Write the current case to disk. */
824       vfm_sink->write ();
825     }
826 }
827
828 /* If the data is stored in memory, causes it to be written to disk.
829    To be called only *between* procedure()s, not within them. */
830 void
831 page_to_disk (void)
832 {
833   if (vfm_source == &vfm_memory_stream)
834     {
835       /* Switch to a disk sink. */
836       vfm_sink = &vfm_disk_stream;
837       vfm_sink->init ();
838       paging = 1;
839       
840       /* Write the cases to disk and destroy them.  We can't call
841          vfm->sink->write() because of compaction. */
842       {
843         struct case_list *cur, *next;
844         
845         for (cur = memory_source_cases; cur; cur = next)
846           {
847             next = cur->next;
848             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
849                         disk_sink_file) != 1)
850               {
851                 msg (ME, _("An error occurred while attempting to "
852                            "write to a temporary file created as the "
853                            "active file, while paging to disk: %s."),
854                      strerror (errno));
855                 err_failure ();
856               }
857             free (cur);
858           }
859       }
860       
861       vfm_source = &vfm_disk_stream;
862       vfm_source->mode ();
863
864       vfm_sink = NULL;
865     }
866 }
867
868 /* Switch the memory stream from sink to source mode. */
869 static void
870 memory_stream_mode (void)
871 {
872   /* Terminate the list. */
873   if (memory_sink_iter)
874     memory_sink_iter->next = NULL;
875
876   /* Sink --> source variables. */
877   memory_source_cases = memory_sink_cases;
878   memory_sink_cases = NULL;
879 }
880
881 /* Destroy all memory source data. */
882 static void
883 memory_stream_destroy_source (void)
884 {
885   struct case_list *cur, *next;
886   
887   for (cur = memory_source_cases; cur; cur = next)
888     {
889       next = cur->next;
890       free (cur);
891     }
892   memory_source_cases = NULL;
893 }
894
895 /* Destroy all memory sink data. */
896 static void
897 memory_stream_destroy_sink (void)
898 {
899   struct case_list *cur, *next;
900   
901   for (cur = memory_sink_cases; cur; cur = next)
902     {
903       next = cur->next;
904       free (cur);
905     }
906   memory_sink_cases = NULL;
907 }
908   
909 /* Memory stream. */
910 struct case_stream vfm_memory_stream = 
911   {
912     memory_stream_init,
913     memory_stream_read,
914     memory_stream_write,
915     memory_stream_mode,
916     memory_stream_destroy_source,
917     memory_stream_destroy_sink,
918     "memory",
919   };
920 \f
921 #include "debug-print.h"
922
923 /* Add temp_case to the lag queue. */
924 static void
925 lag_case (void)
926 {
927   if (lag_count < n_lag)
928     lag_count++;
929   memcpy (lag_queue[lag_head], temp_case,
930           sizeof (union value) * dict_get_value_cnt (temp_dict));
931   if (++lag_head >= n_lag)
932     lag_head = 0;
933 }
934
935 /* Returns a pointer to the lagged case from N_BEFORE cases before the
936    current one, or NULL if there haven't been that many cases yet. */
937 struct ccase *
938 lagged_case (int n_before)
939 {
940   assert (n_before <= n_lag);
941   if (n_before > lag_count)
942     return NULL;
943   
944   {
945     int index = lag_head - n_before;
946     if (index < 0)
947       index += n_lag;
948     return lag_queue[index];
949   }
950 }
951    
952 /* Transforms temp_case and writes it to the replacement active file
953    if advisable.  Returns nonzero if more cases can be accepted, zero
954    otherwise.  Do not call this function again after it has returned
955    zero once.  */
956 int
957 procedure_write_case (void)
958 {
959   /* Index of current transformation. */
960   int cur_trns;
961
962   /* Return value: whether it's reasonable to write any more cases. */
963   int more_cases = 1;
964
965   debug_printf ((_("transform: ")));
966
967   cur_trns = f_trns;
968   for (;;)
969     {
970       /* Output the case if this is temp_trns. */
971       if (cur_trns == temp_trns)
972         {
973           debug_printf (("REC"));
974
975           if (n_lag)
976             lag_case ();
977           
978           vfm_sink_info.ncases++;
979           vfm_sink->write ();
980
981           if (dict_get_case_limit (default_dict))
982             more_cases = (vfm_sink_info.ncases
983                           < dict_get_case_limit (default_dict));
984         }
985
986       /* Are we done? */
987       if (cur_trns >= n_trns)
988         break;
989       
990       debug_printf (("$%d", cur_trns));
991
992       /* Decide which transformation should come next. */
993       {
994         int code;
995         
996         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
997         switch (code)
998           {
999           case -1:
1000             /* Next transformation. */
1001             cur_trns++;
1002             break;
1003           case -2:
1004             /* Delete this case. */
1005             goto done;
1006           default:
1007             /* Go to that transformation. */
1008             cur_trns = code;
1009             break;
1010           }
1011       }
1012     }
1013
1014   /* Call the beginning of group function. */
1015   if (!case_count && begin_func != NULL)
1016     begin_func ();
1017
1018   /* Call the procedure if there is one and FILTER and PROCESS IF
1019      don't prohibit it. */
1020   if (proc_func != NULL
1021       && !FILTERED
1022       && (process_if_expr == NULL ||
1023           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
1024     proc_func (temp_case);
1025
1026   case_count++;
1027   
1028 done:
1029   debug_putc ('\n', stdout);
1030   
1031   {
1032     long *lp;
1033
1034     /* This case is finished.  Initialize the variables for the next case. */
1035     for (lp = reinit_sysmis.vec; *lp != -1;)
1036       temp_case->data[*lp++].f = SYSMIS;
1037     for (lp = reinit_blanks.vec; *lp != -1;)
1038       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
1039   }
1040   
1041   /* Return previously determined value. */
1042   return more_cases;
1043 }
1044
1045 /* Appends TRNS to t_trns[], the list of all transformations to be
1046    performed on data as it is read from the active file. */
1047 void
1048 add_transformation (struct trns_header * trns)
1049 {
1050   if (n_trns >= m_trns)
1051     {
1052       m_trns += 16;
1053       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1054     }
1055   t_trns[n_trns] = trns;
1056   trns->index = n_trns++;
1057 }
1058
1059 /* Cancels all active transformations, including any transformations
1060    created by the input program. */
1061 void
1062 cancel_transformations (void)
1063 {
1064   int i;
1065   for (i = 0; i < n_trns; i++)
1066     {
1067       if (t_trns[i]->free)
1068         t_trns[i]->free (t_trns[i]);
1069       free (t_trns[i]);
1070     }
1071   n_trns = f_trns = 0;
1072   if (m_trns > 32)
1073     {
1074       free (t_trns);
1075       m_trns = 0;
1076     }
1077 }
1078
1079 /* Dumps out the values of all the split variables for the case C. */
1080 static void
1081 dump_splits (struct ccase *c)
1082 {
1083   struct variable *const *split;
1084   struct tab_table *t;
1085   size_t split_cnt;
1086   int i;
1087
1088   split_cnt = dict_get_split_cnt (default_dict);
1089   t = tab_create (3, split_cnt + 1, 0);
1090   tab_dim (t, tab_natural_dimensions);
1091   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1092   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1093   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1094   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1095   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1096   split = dict_get_split_vars (default_dict);
1097   for (i = 0; i < split_cnt; i++)
1098     {
1099       struct variable *v = split[i];
1100       char temp_buf[80];
1101       const char *val_lab;
1102
1103       assert (v->type == NUMERIC || v->type == ALPHA);
1104       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1105       
1106       {
1107         union value val = c->data[v->fv];
1108         if (v->type == ALPHA)
1109           val.c = c->data[v->fv].s;
1110         data_out (temp_buf, &v->print, &val);
1111       }
1112       
1113       temp_buf[v->print.w] = 0;
1114       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1115
1116       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1117       if (val_lab)
1118         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1119     }
1120   tab_flags (t, SOMF_NO_TITLE);
1121   tab_submit (t);
1122 }
1123
1124 /* This procfunc is substituted for the user-supplied procfunc when
1125    SPLIT FILE is active.  This function forms a wrapper around that
1126    procfunc by dividing the input into series. */
1127 static int
1128 SPLIT_FILE_procfunc (struct ccase *c)
1129 {
1130   static struct ccase *prev_case;
1131   struct variable *const *split;
1132   size_t split_cnt;
1133   size_t i;
1134
1135   /* The first case always begins a new series.  We also need to
1136      preserve the values of the case for later comparison. */
1137   if (case_count == 0)
1138     {
1139       if (prev_case)
1140         free (prev_case);
1141       prev_case = xmalloc (vfm_sink_info.case_size);
1142       memcpy (prev_case, c, vfm_sink_info.case_size);
1143
1144       dump_splits (c);
1145       if (virt_begin_func != NULL)
1146         virt_begin_func ();
1147       
1148       return virt_proc_func (c);
1149     }
1150
1151   /* Compare the value of each SPLIT FILE variable to the values on
1152      the previous case. */
1153   split = dict_get_split_vars (default_dict);
1154   split_cnt = dict_get_split_cnt (default_dict);
1155   for (i = 0; i < split_cnt; i++)
1156     {
1157       struct variable *v = split[i];
1158       
1159       switch (v->type)
1160         {
1161         case NUMERIC:
1162           if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
1163             goto not_equal;
1164           break;
1165         case ALPHA:
1166           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1167             goto not_equal;
1168           break;
1169         default:
1170           assert (0);
1171         }
1172     }
1173   return virt_proc_func (c);
1174   
1175 not_equal:
1176   /* The values of the SPLIT FILE variable are different from the
1177      values on the previous case.  That means that it's time to begin
1178      a new series. */
1179   if (end_func != NULL)
1180     end_func ();
1181   dump_splits (c);
1182   if (virt_begin_func != NULL)
1183     virt_begin_func ();
1184   memcpy (prev_case, c, vfm_sink_info.case_size);
1185   return virt_proc_func (c);
1186 }
1187 \f
1188 /* Case compaction. */
1189
1190 /* Copies case SRC to case DEST, compacting it in the process. */
1191 void
1192 compact_case (struct ccase *dest, const struct ccase *src)
1193 {
1194   int i;
1195   int nval = 0;
1196   size_t var_cnt;
1197   
1198   assert (compaction_necessary);
1199
1200   if (temporary == 2)
1201     {
1202       if (dest != compaction_case)
1203         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1204       return;
1205     }
1206
1207   /* Copy all the variables except the scratch variables from SRC to
1208      DEST. */
1209   var_cnt = dict_get_var_cnt (default_dict);
1210   for (i = 0; i < var_cnt; i++)
1211     {
1212       struct variable *v = dict_get_var (default_dict, i);
1213       
1214       if (v->name[0] == '#')
1215         continue;
1216
1217       if (v->type == NUMERIC)
1218         dest->data[nval++] = src->data[v->fv];
1219       else
1220         {
1221           int w = DIV_RND_UP (v->width, sizeof (union value));
1222           
1223           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1224           nval += w;
1225         }
1226     }
1227 }
1228
1229 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1230 static void
1231 finish_compaction (void)
1232 {
1233   int i;
1234
1235   for (i = 0; i < dict_get_var_cnt (default_dict); )
1236     {
1237       struct variable *v = dict_get_var (default_dict, i);
1238
1239       if (v->name[0] == '#') 
1240         dict_delete_var (default_dict, v);
1241       else
1242         i++;
1243     }
1244   dict_compact_values (default_dict);
1245 }
1246
1247