3fdefb5db2940dd609ab813133d95207d0a8263f
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 /* AIX requires this to be the first thing in the file.  */
21 #include <config.h>
22 #if __GNUC__
23 #define alloca __builtin_alloca
24 #else
25 #if HAVE_ALLOCA_H
26 #include <alloca.h>
27 #else
28 #ifdef _AIX
29 #pragma alloca
30 #else
31 #ifndef alloca                  /* predefined by HP cc +Olibcalls */
32 char *alloca ();
33 #endif
34 #endif
35 #endif
36 #endif
37
38 #include <assert.h>
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #if HAVE_UNISTD_H
43 #include <unistd.h>     /* Required by SunOS4. */
44 #endif
45 #include "alloc.h"
46 #include "approx.h"
47 #include "do-ifP.h"
48 #include "error.h"
49 #include "expr.h"
50 #include "misc.h"
51 #include "random.h"
52 #include "som.h"
53 #include "str.h"
54 #include "tab.h"
55 #include "var.h"
56 #include "vector.h"
57 #include "value-labels.h"
58 #include "vfm.h"
59 #include "vfmP.h"
60
61 /*
62    Virtual File Manager (vfm):
63
64    vfm is used to process data files.  It uses the model that data is
65    read from one stream (the data source), then written to another
66    (the data sink).  The data source is then deleted and the data sink
67    becomes the data source for the next procedure. */
68
69 #include "debug-print.h"
70
71 /* This is used to read from the active file. */
72 struct case_stream *vfm_source;
73
74 /* `value' indexes to initialize to particular values for certain cases. */
75 struct long_vec reinit_sysmis;          /* SYSMIS for every case. */
76 struct long_vec reinit_blanks;          /* Blanks for every case. */
77 struct long_vec init_zero;              /* Zero for first case only. */
78 struct long_vec init_blanks;            /* Blanks for first case only. */
79
80 /* This is used to write to the replacement active file. */
81 struct case_stream *vfm_sink;
82
83 /* Information about the data source. */
84 struct stream_info vfm_source_info;
85
86 /* Information about the data sink. */
87 struct stream_info vfm_sink_info;
88
89 /* Filter variable and  `value' index. */
90 static struct variable *filter_var;
91 static int filter_index;
92
93 #define FILTERED                                                        \
94         (filter_index != -1                                             \
95          && (temp_case->data[filter_index].f == 0.0                     \
96              || temp_case->data[filter_index].f == SYSMIS               \
97              || is_num_user_missing (temp_case->data[filter_index].f,   \
98                                      filter_var)))
99
100 /* Nonzero if the case needs to have values deleted before being
101    stored, zero otherwise. */
102 int compaction_necessary;
103
104 /* Number of values after compaction, or the same as
105    vfm_sink_info.nval, if compaction is not necessary. */
106 int compaction_nval;
107
108 /* Temporary case buffer with enough room for `compaction_nval'
109    `value's. */
110 struct ccase *compaction_case;
111
112 /* Within a session, when paging is turned on, it is never turned back
113    off.  This policy might be too aggressive. */
114 static int paging = 0;
115
116 /* Time at which vfm was last invoked. */
117 time_t last_vfm_invocation;
118
119 /* Functions called during procedure processing. */
120 static int (*proc_func) (struct ccase *);       /* Called for each case. */
121 static int (*virt_proc_func) (struct ccase *);  /* From SPLIT_FILE_procfunc. */
122 static void (*begin_func) (void);       /* Called at beginning of a series. */
123 static void (*virt_begin_func) (void);  /* Called by SPLIT_FILE_procfunc. */
124 static void (*end_func) (void); /* Called after end of a series. */
125 int (*write_case) (void);
126
127 /* Number of cases passed to proc_func(). */
128 static int case_count;
129
130 /* Lag queue. */
131 int n_lag;                      /* Number of cases to lag. */
132 static int lag_count;           /* Number of cases in lag_queue so far. */
133 static int lag_head;            /* Index where next case will be added. */
134 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
135
136 static void open_active_file (void);
137 static void close_active_file (void);
138 static int SPLIT_FILE_procfunc (struct ccase *);
139 static void finish_compaction (void);
140 static void lag_case (void);
141 static int procedure_write_case (void);
142 \f
143 /* Public functions. */
144
145 /* Reads all the cases from the active file, transforms them by the
146    active set of transformations, calls PROCFUNC with CURCASE set to
147    the case and CASENUM set to the case number, and writes them to a
148    new active file.
149
150    Divides the active file into zero or more series of one or more
151    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
152    called after each series. */
153 void
154 procedure (void (*beginfunc) (void),
155            int (*procfunc) (struct ccase *curcase),
156            void (*endfunc) (void))
157 {
158   end_func = endfunc;
159   write_case = procedure_write_case;
160
161   if (default_dict.n_splits && procfunc != NULL)
162     {
163       virt_proc_func = procfunc;
164       proc_func = SPLIT_FILE_procfunc;
165       
166       virt_begin_func = beginfunc;
167       begin_func = NULL;
168     } else {
169       begin_func = beginfunc;
170       proc_func = procfunc;
171     }
172
173   last_vfm_invocation = time (NULL);
174
175   open_active_file ();
176   vfm_source->read ();
177   close_active_file ();
178 }
179 \f
180 /* Active file processing support.  Subtly different semantics from
181    procedure(). */
182
183 static int process_active_file_write_case (void);
184
185 /* The casefunc might want us to stop calling it. */
186 static int not_canceled;
187
188 /* Reads all the cases from the active file and passes them one-by-one
189    to CASEFUNC in temp_case.  Before any cases are passed, calls
190    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
191    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
192    file by calling process_active_file_output_case().
193
194    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
195 void
196 process_active_file (void (*beginfunc) (void),
197                      int (*casefunc) (struct ccase *curcase),
198                      void (*endfunc) (void))
199 {
200   proc_func = casefunc;
201   write_case = process_active_file_write_case;
202   not_canceled = 1;
203
204   open_active_file ();
205   beginfunc ();
206   
207   /* There doesn't necessarily need to be an active file. */
208   if (vfm_source)
209     vfm_source->read ();
210   
211   endfunc ();
212   close_active_file ();
213 }
214
215 /* Pass the current case to casefunc. */
216 static int
217 process_active_file_write_case (void)
218 {
219   /* Index of current transformation. */
220   int cur_trns;
221
222   for (cur_trns = f_trns ; cur_trns != temp_trns; )
223     {
224       int code;
225         
226       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
227       switch (code)
228         {
229         case -1:
230           /* Next transformation. */
231           cur_trns++;
232           break;
233         case -2:
234           /* Delete this case. */
235           goto done;
236         default:
237           /* Go to that transformation. */
238           cur_trns = code;
239           break;
240         }
241     }
242
243   if (n_lag)
244     lag_case ();
245           
246   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
247   if (not_canceled
248       && !FILTERED
249       && (process_if_expr == NULL ||
250           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
251     not_canceled = proc_func (temp_case);
252   
253   case_count++;
254   
255  done:
256   {
257     long *lp;
258
259     /* This case is finished.  Initialize the variables for the next case. */
260     for (lp = reinit_sysmis.vec; *lp != -1;)
261       temp_case->data[*lp++].f = SYSMIS;
262     for (lp = reinit_blanks.vec; *lp != -1;)
263       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
264   }
265   
266   return 1;
267 }
268
269 /* Write temp_case to the active file. */
270 void
271 process_active_file_output_case (void)
272 {
273   vfm_sink_info.ncases++;
274   vfm_sink->write ();
275 }
276 \f
277 /* Opening the active file. */
278
279 /* It might be usefully noted that the following several functions are
280    given in the order that they are called by open_active_file(). */
281
282 /* Prepare to write to the replacement active file. */
283 static void
284 prepare_for_writing (void)
285 {
286   /* FIXME: If ALL the conditions listed below hold true, then the
287      replacement active file is guaranteed to be identical to the
288      original active file:
289
290      1. TEMPORARY was the first transformation, OR, there were no
291      transformations at all.
292
293      2. Input is not coming from an input program.
294
295      3. Compaction is not necessary.
296
297      So, in this case, we shouldn't have to replace the active
298      file--it's just a waste of time and space. */
299
300   vfm_sink_info.ncases = 0;
301   vfm_sink_info.nval = default_dict.nval;
302   vfm_sink_info.case_size = (sizeof (struct ccase)
303                              + (default_dict.nval - 1) * sizeof (union value));
304   
305   if (vfm_sink == NULL)
306     {
307       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
308           && !paging)
309         {
310           msg (MW, _("Workspace overflow predicted.  Max workspace is "
311                      "currently set to %d KB (%d cases at %d bytes each).  "
312                      "Paging active file to disk."),
313                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
314                vfm_sink_info.case_size);
315           
316           paging = 1;
317         }
318       
319       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
320     }
321 }
322
323 /* Arrange for compacting the output cases for storage. */
324 static void
325 arrange_compaction (void)
326 {
327   int count_values = 0;
328
329   {
330     int i;
331     
332     /* Count up the number of `value's that will be output. */
333     for (i = 0; i < temp_dict->nvar; i++)
334       if (temp_dict->var[i]->name[0] != '#')
335         {
336           assert (temp_dict->var[i]->nv > 0);
337           count_values += temp_dict->var[i]->nv;
338         }
339     assert (temporary == 2 || count_values <= temp_dict->nval);
340   }
341   
342   /* Compaction is only necessary if the number of `value's to output
343      differs from the number already present. */
344   compaction_nval = count_values;
345   compaction_necessary = temporary == 2 || count_values != temp_dict->nval;
346   
347   if (vfm_sink->init)
348     vfm_sink->init ();
349 }
350
351 /* Prepares the temporary case and compaction case. */
352 static void
353 make_temp_case (void)
354 {
355   temp_case = xmalloc (vfm_sink_info.case_size);
356
357   if (compaction_necessary)
358     compaction_case = xmalloc (sizeof (struct ccase)
359                                + sizeof (union value) * (compaction_nval - 1));
360 }
361
362 #if DEBUGGING
363 /* Returns the name of the variable that owns the index CCASE_INDEX
364    into ccase. */
365 static const char *
366 index_to_varname (int ccase_index)
367 {
368   int i;
369
370   for (i = 0; i < default_dict.nvar; i++)
371     {
372       struct variable *v = default_dict.var[i];
373       
374       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
375         return default_dict.var[i]->name;
376     }
377   return _("<NOVAR>");
378 }
379 #endif
380
381 /* Initializes temp_case from the vectors that say which `value's need
382    to be initialized just once, and which ones need to be
383    re-initialized before every case. */
384 static void
385 vector_initialization (void)
386 {
387   int i;
388   long *lp;
389   
390   /* Just once. */
391   for (i = 0; i < init_zero.n; i++)
392     temp_case->data[init_zero.vec[i]].f = 0.0;
393   for (i = 0; i < init_blanks.n; i++)
394     memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING);
395
396   /* These vectors need to be repeatedly accessed, so we add a
397      sentinel to (hopefully) improve speed. */
398   vec_insert (&reinit_sysmis, -1);
399   vec_insert (&reinit_blanks, -1);
400
401   for (lp = reinit_sysmis.vec; *lp != -1;)
402     temp_case->data[*lp++].f = SYSMIS;
403   for (lp = reinit_blanks.vec; *lp != -1;)
404     memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
405   
406 #if DEBUGGING
407   printf ("vfm: init_zero=");
408   for (i = 0; i < init_zero.n; i++)
409     printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i]));
410   printf (" init_blanks=");
411   for (i = 0; i < init_blanks.n; i++)
412     printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i]));
413   printf (" reinit_sysmis=");
414   for (lp = reinit_sysmis.vec; *lp != -1; lp++)
415     printf ("%s%s", lp != reinit_sysmis.vec ? "," : "",
416             index_to_varname (*lp));
417   printf (" reinit_blanks=");
418   for (lp = reinit_blanks.vec; *lp != -1; lp++)
419     printf ("%s%s", lp != reinit_blanks.vec ? "," : "",
420             index_to_varname (*lp));
421   printf ("\n");
422 #endif
423 }
424
425 /* Sets filter_index to an appropriate value. */
426 static void
427 setup_filter (void)
428 {
429   filter_index = -1;
430   
431   if (default_dict.filter_var[0])
432     {
433       struct variable *fv = find_variable (default_dict.filter_var);
434       
435       if (fv == NULL || fv->type == ALPHA)
436         default_dict.filter_var[0] = 0;
437       else
438         {
439           filter_index = fv->index;
440           filter_var = fv;
441         }
442     }
443 }
444
445 /* Sets all the lag-related variables based on value of n_lag. */
446 static void
447 setup_lag (void)
448 {
449   int i;
450   
451   if (n_lag == 0)
452     return;
453
454   lag_count = 0;
455   lag_head = 0;
456   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
457   for (i = 0; i < n_lag; i++)
458     lag_queue[i] = xmalloc (temp_dict->nval * sizeof **lag_queue);
459 }
460
461 /* There is a lot of potential confusion in the vfm and related
462    routines over the number of `value's at each stage of the process.
463    Here is each nval count, with explanation, as set up by
464    open_active_file():
465
466    vfm_source_info.nval: Number of `value's in the cases returned by
467    the source stream.  This value turns out not to be very useful, but
468    we maintain it anyway.
469
470    vfm_sink_info.nval: Number of `value's in the cases after all
471    transformations have been performed.  Never less than
472    vfm_source_info.nval.
473
474    temp_dict->nval: Number of `value's in the cases after the
475    transformations leading up to TEMPORARY have been performed.  If
476    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
477    Never less than vfm_sink_info.nval.
478
479    compaction_nval: Number of `value's in the cases after the
480    transformations leading up to TEMPORARY have been performed and the
481    case has been compacted by compact_case(), if compaction is
482    necessary.  This the number of `value's in the cases saved by the
483    sink stream.  (However, note that the cases passed to the sink
484    stream have not yet been compacted.  It is the responsibility of
485    the data sink to call compact_case().)  This may be less than,
486    greater than, or equal to vfm_source_info.nval.  `compaction'
487    becomes the new value of default_dict.nval after the procedure is
488    completed.
489
490    default_dict.nval: This is often an alias for temp_dict->nval.  As
491    such it can really have no separate existence until the procedure
492    is complete.  For this reason it should *not* be referenced inside
493    the execution of a procedure. */
494 /* Makes all preparations for reading from the data source and writing
495    to the data sink. */
496 static void
497 open_active_file (void)
498 {
499   /* Sometimes we want to refer to the dictionary that applies to the
500      data actually written to the sink.  This is either temp_dict or
501      default_dict.  However, if TEMPORARY is not on, then temp_dict
502      does not apply.  So, we can set temp_dict to default_dict in this
503      case. */
504   if (!temporary)
505     {
506       temp_trns = n_trns;
507       temp_dict = &default_dict;
508     }
509
510   /* No cases passed to the procedure yet. */
511   case_count = 0;
512
513   /* The rest. */
514   prepare_for_writing ();
515   arrange_compaction ();
516   make_temp_case ();
517   vector_initialization ();
518   discard_ctl_stack ();
519   setup_filter ();
520   setup_lag ();
521
522   /* Debug output. */
523   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
524                  vfm_source->name, vfm_sink->name));
525   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
526                  "temp_dict->nval=%d, compaction_nval=%d, "
527                  "default_dict.nval=%d\n",
528                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
529                  compaction_nval, default_dict.nval));
530 }
531 \f
532 /* Closes the active file. */
533 static void
534 close_active_file (void)
535 {
536   /* Close the current case group. */
537   if (case_count && end_func != NULL)
538     end_func ();
539
540   /* Stop lagging (catch up?). */
541   if (n_lag)
542     {
543       int i;
544       
545       for (i = 0; i < n_lag; i++)
546         free (lag_queue[i]);
547       free (lag_queue);
548       n_lag = 0;
549     }
550   
551   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
552      off TEMPORARY. */
553   if (temporary)
554     {
555       restore_dictionary (temp_dict);
556       temp_dict = NULL;
557     }
558
559   /* The default dictionary assumes the compacted data size. */
560   default_dict.nval = compaction_nval;
561     
562   /* Old data sink --> New data source. */
563   if (vfm_source && vfm_source->destroy_source)
564     vfm_source->destroy_source ();
565   
566   vfm_source = vfm_sink;
567   vfm_source_info.ncases = vfm_sink_info.ncases;
568   vfm_source_info.nval = compaction_nval;
569   vfm_source_info.case_size = (sizeof (struct ccase)
570                                + (compaction_nval - 1) * sizeof (union value));
571   if (vfm_source->mode)
572     vfm_source->mode ();
573
574   /* Old data sink is gone now. */
575   vfm_sink = NULL;
576
577   /* Finish compaction. */
578   if (compaction_necessary)
579     finish_compaction ();
580   cancel_temporary ();
581
582   /* Free temporary cases. */
583   free (temp_case);
584   temp_case = NULL;
585
586   free (compaction_case);
587   compaction_case = NULL;
588
589   /* Cancel PROCESS IF. */
590   expr_free (process_if_expr);
591   process_if_expr = NULL;
592
593   /* Cancel FILTER if temporary. */
594   if (filter_index != -1 && !FILTER_before_TEMPORARY)
595     default_dict.filter_var[0] = 0;
596
597   /* Cancel transformations. */
598   cancel_transformations ();
599
600   /* Clear value-initialization vectors. */
601   vec_clear (&init_zero);
602   vec_clear (&init_blanks);
603   vec_clear (&reinit_sysmis);
604   vec_clear (&reinit_blanks);
605
606   /* Turn off case limiter. */
607   default_dict.N = 0;
608
609   /* Clear VECTOR vectors. */
610   {
611     int i;
612
613     for (i = 0; i < nvec; i++)
614       free (vec[i].v);
615     free (vec);
616     vec = NULL;
617     nvec = 0;
618   }
619
620   debug_printf (("vfm: procedure complete\n\n"));
621 }
622 \f
623 /* Disk case stream. */
624
625 /* Associated files. */
626 FILE *disk_source_file;
627 FILE *disk_sink_file;
628
629 /* Initializes the disk sink. */
630 static void
631 disk_stream_init (void)
632 {
633   disk_sink_file = tmpfile ();
634   if (!disk_sink_file)
635     {
636       msg (ME, _("An error occurred attempting to create a temporary "
637                  "file for use as the active file: %s."),
638            strerror (errno));
639       err_failure ();
640     }
641 }
642
643 /* Reads all cases from the disk source and passes them one by one to
644    write_case(). */
645 static void
646 disk_stream_read (void)
647 {
648   int i;
649
650   for (i = 0; i < vfm_source_info.ncases; i++)
651     {
652       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
653         {
654           msg (ME, _("An error occurred while attempting to read from "
655                "a temporary file created for the active file: %s."),
656                strerror (errno));
657           err_failure ();
658           return;
659         }
660
661       if (!write_case ())
662         return;
663     }
664 }
665
666 /* Writes temp_case to the disk sink. */
667 static void
668 disk_stream_write (void)
669 {
670   union value *src_case;
671
672   if (compaction_necessary)
673     {
674       compact_case (compaction_case, temp_case);
675       src_case = (union value *) compaction_case;
676     }
677   else src_case = (union value *) temp_case;
678
679   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
680               disk_sink_file) != 1)
681     {
682       msg (ME, _("An error occurred while attempting to write to a "
683                  "temporary file used as the active file: %s."),
684            strerror (errno));
685       err_failure ();
686     }
687 }
688
689 /* Switches the stream from a sink to a source. */
690 static void
691 disk_stream_mode (void)
692 {
693   /* Rewind the sink. */
694   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
695     {
696       msg (ME, _("An error occurred while attempting to rewind a "
697                  "temporary file used as the active file: %s."),
698            strerror (errno));
699       err_failure ();
700     }
701   
702   /* Sink --> source variables. */
703   disk_source_file = disk_sink_file;
704 }
705
706 /* Destroys the source's internal data. */
707 static void
708 disk_stream_destroy_source (void)
709 {
710   if (disk_source_file)
711     {
712       fclose (disk_source_file);
713       disk_source_file = NULL;
714     }
715 }
716
717 /* Destroys the sink's internal data. */
718 static void
719 disk_stream_destroy_sink (void)
720 {
721   if (disk_sink_file)
722     {
723       fclose (disk_sink_file);
724       disk_sink_file = NULL;
725     }
726 }
727
728 /* Disk stream. */
729 struct case_stream vfm_disk_stream = 
730   {
731     disk_stream_init,
732     disk_stream_read,
733     disk_stream_write,
734     disk_stream_mode,
735     disk_stream_destroy_source,
736     disk_stream_destroy_sink,
737     "disk",
738   };
739 \f
740 /* Memory case stream. */
741
742 /* List of cases stored in the stream. */
743 struct case_list *memory_source_cases;
744 struct case_list *memory_sink_cases;
745
746 /* Current case. */
747 struct case_list *memory_sink_iter;
748
749 /* Maximum number of cases. */
750 int memory_sink_max_cases;
751
752 /* Initializes the memory stream variables for writing. */
753 static void
754 memory_stream_init (void)
755 {
756   memory_sink_cases = NULL;
757   memory_sink_iter = NULL;
758   
759   assert (compaction_nval);
760   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
761 }
762
763 /* Reads the case stream from memory and passes it to write_case(). */
764 static void
765 memory_stream_read (void)
766 {
767   while (memory_source_cases != NULL)
768     {
769       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
770       
771       {
772         struct case_list *current = memory_source_cases;
773         memory_source_cases = memory_source_cases->next;
774         free (current);
775       }
776       
777       if (!write_case ())
778         return;
779     }
780 }
781
782 /* Writes temp_case to the memory stream. */
783 static void
784 memory_stream_write (void)
785 {
786   struct case_list *new_case = malloc (sizeof (struct case_list)
787                                        + ((compaction_nval - 1)
788                                           * sizeof (union value)));
789
790   /* If we've got memory to spare then add it to the linked list. */
791   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
792     {
793       if (compaction_necessary)
794         compact_case (&new_case->c, temp_case);
795       else
796         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
797
798       /* Append case to linked list. */
799       if (memory_sink_cases)
800         memory_sink_iter = memory_sink_iter->next = new_case;
801       else
802         memory_sink_iter = memory_sink_cases = new_case;
803     }
804   else
805     {
806       /* Out of memory.  Write the active file to disk. */
807       struct case_list *cur, *next;
808
809       /* Notify the user. */
810       if (!new_case)
811         msg (MW, _("Virtual memory exhausted.  Paging active file "
812                    "to disk."));
813       else
814         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
815                    "overflowed.  Paging active file to disk."),
816              MAX_WORKSPACE / 1024, memory_sink_max_cases,
817              compaction_nval * sizeof (union value));
818
819       free (new_case);
820
821       /* Switch to a disk sink. */
822       vfm_sink = &vfm_disk_stream;
823       vfm_sink->init ();
824       paging = 1;
825
826       /* Terminate the list. */
827       if (memory_sink_iter)
828         memory_sink_iter->next = NULL;
829
830       /* Write the cases to disk and destroy them.  We can't call
831          vfm->sink->write() because of compaction. */
832       for (cur = memory_sink_cases; cur; cur = next)
833         {
834           next = cur->next;
835           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
836                       disk_sink_file) != 1)
837             {
838               msg (ME, _("An error occurred while attempting to "
839                          "write to a temporary file created as the "
840                          "active file, while paging to disk: %s."),
841                    strerror (errno));
842               err_failure ();
843             }
844           free (cur);
845         }
846
847       /* Write the current case to disk. */
848       vfm_sink->write ();
849     }
850 }
851
852 /* If the data is stored in memory, causes it to be written to disk.
853    To be called only *between* procedure()s, not within them. */
854 void
855 page_to_disk (void)
856 {
857   if (vfm_source == &vfm_memory_stream)
858     {
859       /* Switch to a disk sink. */
860       vfm_sink = &vfm_disk_stream;
861       vfm_sink->init ();
862       paging = 1;
863       
864       /* Write the cases to disk and destroy them.  We can't call
865          vfm->sink->write() because of compaction. */
866       {
867         struct case_list *cur, *next;
868         
869         for (cur = memory_source_cases; cur; cur = next)
870           {
871             next = cur->next;
872             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
873                         disk_sink_file) != 1)
874               {
875                 msg (ME, _("An error occurred while attempting to "
876                            "write to a temporary file created as the "
877                            "active file, while paging to disk: %s."),
878                      strerror (errno));
879                 err_failure ();
880               }
881             free (cur);
882           }
883       }
884       
885       vfm_source = &vfm_disk_stream;
886       vfm_source->mode ();
887
888       vfm_sink = NULL;
889     }
890 }
891
892 /* Switch the memory stream from sink to source mode. */
893 static void
894 memory_stream_mode (void)
895 {
896   /* Terminate the list. */
897   if (memory_sink_iter)
898     memory_sink_iter->next = NULL;
899
900   /* Sink --> source variables. */
901   memory_source_cases = memory_sink_cases;
902   memory_sink_cases = NULL;
903 }
904
905 /* Destroy all memory source data. */
906 static void
907 memory_stream_destroy_source (void)
908 {
909   struct case_list *cur, *next;
910   
911   for (cur = memory_source_cases; cur; cur = next)
912     {
913       next = cur->next;
914       free (cur);
915     }
916   memory_source_cases = NULL;
917 }
918
919 /* Destroy all memory sink data. */
920 static void
921 memory_stream_destroy_sink (void)
922 {
923   struct case_list *cur, *next;
924   
925   for (cur = memory_sink_cases; cur; cur = next)
926     {
927       next = cur->next;
928       free (cur);
929     }
930   memory_sink_cases = NULL;
931 }
932   
933 /* Memory stream. */
934 struct case_stream vfm_memory_stream = 
935   {
936     memory_stream_init,
937     memory_stream_read,
938     memory_stream_write,
939     memory_stream_mode,
940     memory_stream_destroy_source,
941     memory_stream_destroy_sink,
942     "memory",
943   };
944 \f
945 #include "debug-print.h"
946
947 /* Add temp_case to the lag queue. */
948 static void
949 lag_case (void)
950 {
951   if (lag_count < n_lag)
952     lag_count++;
953   memcpy (lag_queue[lag_head], temp_case, sizeof (union value) * temp_dict->nval);
954   if (++lag_head >= n_lag)
955     lag_head = 0;
956 }
957
958 /* Returns a pointer to the lagged case from N_BEFORE cases before the
959    current one, or NULL if there haven't been that many cases yet. */
960 struct ccase *
961 lagged_case (int n_before)
962 {
963   assert (n_before <= n_lag);
964   if (n_before > lag_count)
965     return NULL;
966   
967   {
968     int index = lag_head - n_before;
969     if (index < 0)
970       index += n_lag;
971     return lag_queue[index];
972   }
973 }
974    
975 /* Transforms temp_case and writes it to the replacement active file
976    if advisable.  Returns nonzero if more cases can be accepted, zero
977    otherwise.  Do not call this function again after it has returned
978    zero once.  */
979 int
980 procedure_write_case (void)
981 {
982   /* Index of current transformation. */
983   int cur_trns;
984
985   /* Return value: whether it's reasonable to write any more cases. */
986   int more_cases = 1;
987
988   debug_printf ((_("transform: ")));
989
990   cur_trns = f_trns;
991   for (;;)
992     {
993       /* Output the case if this is temp_trns. */
994       if (cur_trns == temp_trns)
995         {
996           debug_printf (("REC"));
997
998           if (n_lag)
999             lag_case ();
1000           
1001           vfm_sink_info.ncases++;
1002           vfm_sink->write ();
1003
1004           if (default_dict.N)
1005             more_cases = vfm_sink_info.ncases < default_dict.N;
1006         }
1007
1008       /* Are we done? */
1009       if (cur_trns >= n_trns)
1010         break;
1011       
1012       debug_printf (("$%d", cur_trns));
1013
1014       /* Decide which transformation should come next. */
1015       {
1016         int code;
1017         
1018         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
1019         switch (code)
1020           {
1021           case -1:
1022             /* Next transformation. */
1023             cur_trns++;
1024             break;
1025           case -2:
1026             /* Delete this case. */
1027             goto done;
1028           default:
1029             /* Go to that transformation. */
1030             cur_trns = code;
1031             break;
1032           }
1033       }
1034     }
1035
1036   /* Call the beginning of group function. */
1037   if (!case_count && begin_func != NULL)
1038     begin_func ();
1039
1040   /* Call the procedure if there is one and FILTER and PROCESS IF
1041      don't prohibit it. */
1042   if (proc_func != NULL
1043       && !FILTERED
1044       && (process_if_expr == NULL ||
1045           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
1046     proc_func (temp_case);
1047
1048   case_count++;
1049   
1050 done:
1051   debug_putc ('\n', stdout);
1052   
1053   {
1054     long *lp;
1055
1056     /* This case is finished.  Initialize the variables for the next case. */
1057     for (lp = reinit_sysmis.vec; *lp != -1;)
1058       temp_case->data[*lp++].f = SYSMIS;
1059     for (lp = reinit_blanks.vec; *lp != -1;)
1060       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
1061   }
1062   
1063   /* Return previously determined value. */
1064   return more_cases;
1065 }
1066
1067 /* Appends TRNS to t_trns[], the list of all transformations to be
1068    performed on data as it is read from the active file. */
1069 void
1070 add_transformation (struct trns_header * trns)
1071 {
1072   if (n_trns >= m_trns)
1073     {
1074       m_trns += 16;
1075       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1076     }
1077   t_trns[n_trns] = trns;
1078   trns->index = n_trns++;
1079 }
1080
1081 /* Cancels all active transformations, including any transformations
1082    created by the input program. */
1083 void
1084 cancel_transformations (void)
1085 {
1086   int i;
1087   for (i = 0; i < n_trns; i++)
1088     {
1089       if (t_trns[i]->free)
1090         t_trns[i]->free (t_trns[i]);
1091       free (t_trns[i]);
1092     }
1093   n_trns = f_trns = 0;
1094   if (m_trns > 32)
1095     {
1096       free (t_trns);
1097       m_trns = 0;
1098     }
1099 }
1100
1101 /* Dumps out the values of all the split variables for the case C. */
1102 static void
1103 dump_splits (struct ccase *c)
1104 {
1105   struct variable **iter;
1106   struct tab_table *t;
1107   int i;
1108
1109   t = tab_create (3, default_dict.n_splits + 1, 0);
1110   tab_dim (t, tab_natural_dimensions);
1111   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, default_dict.n_splits);
1112   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, default_dict.n_splits);
1113   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1114   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1115   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1116   for (iter = default_dict.splits, i = 0; *iter; iter++, i++)
1117     {
1118       struct variable *v = *iter;
1119       char temp_buf[80];
1120       const char *val_lab;
1121
1122       assert (v->type == NUMERIC || v->type == ALPHA);
1123       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1124       
1125       {
1126         union value val = c->data[v->fv];
1127         if (v->type == ALPHA)
1128           val.c = c->data[v->fv].s;
1129         data_out (temp_buf, &v->print, &val);
1130       }
1131       
1132       temp_buf[v->print.w] = 0;
1133       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1134
1135       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1136       if (val_lab)
1137         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1138     }
1139   tab_flags (t, SOMF_NO_TITLE);
1140   tab_submit (t);
1141 }
1142
1143 /* This procfunc is substituted for the user-supplied procfunc when
1144    SPLIT FILE is active.  This function forms a wrapper around that
1145    procfunc by dividing the input into series. */
1146 static int
1147 SPLIT_FILE_procfunc (struct ccase *c)
1148 {
1149   static struct ccase *prev_case;
1150   struct variable **iter;
1151
1152   /* The first case always begins a new series.  We also need to
1153      preserve the values of the case for later comparison. */
1154   if (case_count == 0)
1155     {
1156       if (prev_case)
1157         free (prev_case);
1158       prev_case = xmalloc (vfm_sink_info.case_size);
1159       memcpy (prev_case, c, vfm_sink_info.case_size);
1160
1161       dump_splits (c);
1162       if (virt_begin_func != NULL)
1163         virt_begin_func ();
1164       
1165       return virt_proc_func (c);
1166     }
1167
1168   /* Compare the value of each SPLIT FILE variable to the values on
1169      the previous case. */
1170   for (iter = default_dict.splits; *iter; iter++)
1171     {
1172       struct variable *v = *iter;
1173       
1174       switch (v->type)
1175         {
1176         case NUMERIC:
1177           if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
1178             goto not_equal;
1179           break;
1180         case ALPHA:
1181           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1182             goto not_equal;
1183           break;
1184         default:
1185           assert (0);
1186         }
1187     }
1188   return virt_proc_func (c);
1189   
1190 not_equal:
1191   /* The values of the SPLIT FILE variable are different from the
1192      values on the previous case.  That means that it's time to begin
1193      a new series. */
1194   if (end_func != NULL)
1195     end_func ();
1196   dump_splits (c);
1197   if (virt_begin_func != NULL)
1198     virt_begin_func ();
1199   memcpy (prev_case, c, vfm_sink_info.case_size);
1200   return virt_proc_func (c);
1201 }
1202 \f
1203 /* Case compaction. */
1204
1205 /* Copies case SRC to case DEST, compacting it in the process. */
1206 void
1207 compact_case (struct ccase *dest, const struct ccase *src)
1208 {
1209   int i;
1210   int nval = 0;
1211   
1212   assert (compaction_necessary);
1213
1214   if (temporary == 2)
1215     {
1216       if (dest != compaction_case)
1217         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1218       return;
1219     }
1220
1221   /* Copy all the variables except the scratch variables from SRC to
1222      DEST. */
1223   for (i = 0; i < default_dict.nvar; i++)
1224     {
1225       struct variable *v = default_dict.var[i];
1226       
1227       if (v->name[0] == '#')
1228         continue;
1229
1230       if (v->type == NUMERIC)
1231         dest->data[nval++] = src->data[v->fv];
1232       else
1233         {
1234           int w = DIV_RND_UP (v->width, sizeof (union value));
1235           
1236           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1237           nval += w;
1238         }
1239     }
1240 }
1241
1242 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1243 static void
1244 finish_compaction (void)
1245 {
1246   int copy_index = 0;
1247   int nval = 0;
1248   int i;
1249
1250   for (i = 0; i < default_dict.nvar; i++)
1251     {
1252       struct variable *v = default_dict.var[i];
1253
1254       if (v->name[0] == '#')
1255         {
1256           clear_variable (&default_dict, v);
1257           free (v);
1258           continue;
1259         }
1260
1261       v->fv = nval;
1262       if (v->type == NUMERIC)
1263         nval++;
1264       else
1265         nval += DIV_RND_UP (v->width, sizeof (union value));
1266       
1267       default_dict.var[copy_index++] = v;
1268     }
1269   if (copy_index != default_dict.nvar)
1270     {
1271       default_dict.var = xrealloc (default_dict.var,
1272                                    sizeof *default_dict.var * copy_index);
1273       default_dict.nvar = copy_index;
1274     }
1275 }
1276
1277