Prevent procedure() from being called recursively.
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "approx.h"
32 #include "do-ifP.h"
33 #include "error.h"
34 #include "expr.h"
35 #include "misc.h"
36 #include "random.h"
37 #include "som.h"
38 #include "str.h"
39 #include "tab.h"
40 #include "var.h"
41 #include "value-labels.h"
42
43 /*
44    Virtual File Manager (vfm):
45
46    vfm is used to process data files.  It uses the model that data is
47    read from one stream (the data source), then written to another
48    (the data sink).  The data source is then deleted and the data sink
49    becomes the data source for the next procedure. */
50
51 #include "debug-print.h"
52
53 /* Procedure execution data. */
54 struct write_case_data
55   {
56     void (*beginfunc) (void *);
57     int (*procfunc) (struct ccase *, void *);
58     void (*endfunc) (void *);
59     void *aux;
60   };
61
62 /* This is used to read from the active file. */
63 struct case_stream *vfm_source;
64
65 /* This is used to write to the replacement active file. */
66 struct case_stream *vfm_sink;
67
68 /* Information about the data source. */
69 struct stream_info vfm_source_info;
70
71 /* Information about the data sink. */
72 struct stream_info vfm_sink_info;
73
74 /* Nonzero if the case needs to have values deleted before being
75    stored, zero otherwise. */
76 int compaction_necessary;
77
78 /* Number of values after compaction, or the same as
79    vfm_sink_info.nval, if compaction is not necessary. */
80 int compaction_nval;
81
82 /* Temporary case buffer with enough room for `compaction_nval'
83    `value's. */
84 struct ccase *compaction_case;
85
86 /* Within a session, when paging is turned on, it is never turned back
87    off.  This policy might be too aggressive. */
88 static int paging = 0;
89
90 /* Time at which vfm was last invoked. */
91 time_t last_vfm_invocation;
92
93 /* Number of cases passed to proc_func(). */
94 static int case_count;
95
96 /* Lag queue. */
97 int n_lag;                      /* Number of cases to lag. */
98 static int lag_count;           /* Number of cases in lag_queue so far. */
99 static int lag_head;            /* Index where next case will be added. */
100 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
101
102 static void open_active_file (void);
103 static void close_active_file (struct write_case_data *);
104 static int SPLIT_FILE_procfunc (struct ccase *, void *);
105 static void finish_compaction (void);
106 static void lag_case (void);
107 static int procedure_write_case (struct write_case_data *);
108 static void clear_temp_case (void);
109 static int exclude_this_case (void);
110 \f
111 /* Public functions. */
112
113 /* Reads all the cases from the active file, transforms them by
114    the active set of transformations, calls PROCFUNC with CURCASE
115    set to the case, and writes them to a new active file.
116
117    Divides the active file into zero or more series of one or more
118    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
119    called after each series.
120
121    Arbitrary user-specified data AUX is passed to BEGINFUNC,
122    PROCFUNC, and ENDFUNC as auxiliary data. */
123 void
124 procedure (void (*beginfunc) (void *),
125            int (*procfunc) (struct ccase *curcase, void *),
126            void (*endfunc) (void *),
127            void *aux)
128 {
129   static int recursive_call;
130
131   struct write_case_data procedure_write_data;
132   struct write_case_data split_file_data;
133
134   assert (++recursive_call == 1);
135
136   if (dict_get_split_cnt (default_dict) == 0) 
137     {
138       /* Normally we just use the data passed by the user. */
139       procedure_write_data.beginfunc = beginfunc;
140       procedure_write_data.procfunc = procfunc;
141       procedure_write_data.endfunc = endfunc;
142       procedure_write_data.aux = aux;
143     }
144   else
145     {
146       /* Under SPLIT FILE, we add a layer of indirection. */
147       procedure_write_data.beginfunc = NULL;
148       procedure_write_data.procfunc = SPLIT_FILE_procfunc;
149       procedure_write_data.endfunc = endfunc;
150       procedure_write_data.aux = &split_file_data;
151
152       split_file_data.beginfunc = beginfunc;
153       split_file_data.procfunc = procfunc;
154       split_file_data.endfunc = endfunc;
155       split_file_data.aux = aux;
156     }
157
158   last_vfm_invocation = time (NULL);
159
160   open_active_file ();
161   vfm_source->read (procedure_write_case, &procedure_write_data);
162   close_active_file (&procedure_write_data);
163
164   assert (--recursive_call == 0);
165 }
166 \f
167 /* Active file processing support.  Subtly different semantics from
168    procedure(). */
169
170 static int process_active_file_write_case (struct write_case_data *data);
171
172 /* The casefunc might want us to stop calling it. */
173 static int not_canceled;
174
175 /* Reads all the cases from the active file and passes them one-by-one
176    to CASEFUNC in temp_case.  Before any cases are passed, calls
177    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
178    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
179    file by calling process_active_file_output_case().
180
181    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
182 void
183 process_active_file (void (*beginfunc) (void *),
184                      int (*casefunc) (struct ccase *curcase, void *),
185                      void (*endfunc) (void *),
186                      void *aux)
187 {
188   struct write_case_data process_active_write_data;
189
190   process_active_write_data.beginfunc = beginfunc;
191   process_active_write_data.procfunc = casefunc;
192   process_active_write_data.endfunc = endfunc;
193   process_active_write_data.aux = aux;
194
195   not_canceled = 1;
196
197   open_active_file ();
198   beginfunc (aux);
199   
200   /* There doesn't necessarily need to be an active file. */
201   if (vfm_source)
202     vfm_source->read (process_active_file_write_case,
203                       &process_active_write_data);
204   
205   endfunc (aux);
206   close_active_file (&process_active_write_data);
207 }
208
209 /* Pass the current case to casefunc. */
210 static int
211 process_active_file_write_case (struct write_case_data *data)
212 {
213   /* Index of current transformation. */
214   int cur_trns;
215
216   for (cur_trns = f_trns ; cur_trns != temp_trns; )
217     {
218       int code;
219         
220       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
221       switch (code)
222         {
223         case -1:
224           /* Next transformation. */
225           cur_trns++;
226           break;
227         case -2:
228           /* Delete this case. */
229           goto done;
230         default:
231           /* Go to that transformation. */
232           cur_trns = code;
233           break;
234         }
235     }
236
237   if (n_lag)
238     lag_case ();
239           
240   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
241   if (not_canceled && !exclude_this_case ())
242     not_canceled = data->procfunc (temp_case, data->aux);
243   
244   case_count++;
245   
246  done:
247   clear_temp_case ();
248
249   return 1;
250 }
251
252 /* Write temp_case to the active file. */
253 void
254 process_active_file_output_case (void)
255 {
256   vfm_sink_info.ncases++;
257   vfm_sink->write ();
258 }
259 \f
260 /* Opening the active file. */
261
262 /* It might be usefully noted that the following several functions are
263    given in the order that they are called by open_active_file(). */
264
265 /* Prepare to write to the replacement active file. */
266 static void
267 prepare_for_writing (void)
268 {
269   /* FIXME: If ALL the conditions listed below hold true, then the
270      replacement active file is guaranteed to be identical to the
271      original active file:
272
273      1. TEMPORARY was the first transformation, OR, there were no
274      transformations at all.
275
276      2. Input is not coming from an input program.
277
278      3. Compaction is not necessary.
279
280      So, in this case, we shouldn't have to replace the active
281      file--it's just a waste of time and space. */
282
283   vfm_sink_info.ncases = 0;
284   vfm_sink_info.nval = dict_get_next_value_idx (default_dict);
285   vfm_sink_info.case_size = dict_get_case_size (default_dict);
286   
287   if (vfm_sink == NULL)
288     {
289       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
290           && !paging)
291         {
292           msg (MW, _("Workspace overflow predicted.  Max workspace is "
293                      "currently set to %d KB (%d cases at %d bytes each).  "
294                      "Paging active file to disk."),
295                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
296                vfm_sink_info.case_size);
297           
298           paging = 1;
299         }
300       
301       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
302     }
303 }
304
305 /* Arrange for compacting the output cases for storage. */
306 static void
307 arrange_compaction (void)
308 {
309   int count_values = 0;
310
311   {
312     int i;
313     
314     /* Count up the number of `value's that will be output. */
315     for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
316       {
317         struct variable *v = dict_get_var (temp_dict, i);
318
319         if (v->name[0] != '#')
320           {
321             assert (v->nv > 0);
322             count_values += v->nv;
323           } 
324       }
325     assert (temporary == 2
326             || count_values <= dict_get_next_value_idx (temp_dict));
327   }
328   
329   /* Compaction is only necessary if the number of `value's to output
330      differs from the number already present. */
331   compaction_nval = count_values;
332   if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
333     compaction_necessary = 1;
334   else
335     compaction_necessary = 0;
336   
337   if (vfm_sink->init)
338     vfm_sink->init ();
339 }
340
341 /* Prepares the temporary case and compaction case. */
342 static void
343 make_temp_case (void)
344 {
345   temp_case = xmalloc (vfm_sink_info.case_size);
346
347   if (compaction_necessary)
348     compaction_case = xmalloc (sizeof (struct ccase)
349                                + sizeof (union value) * (compaction_nval - 1));
350 }
351
352 #if DEBUGGING
353 /* Returns the name of the variable that owns the index CCASE_INDEX
354    into ccase. */
355 static const char *
356 index_to_varname (int ccase_index)
357 {
358   int i;
359
360   for (i = 0; i < default_dict.nvar; i++)
361     {
362       struct variable *v = default_dict.var[i];
363       
364       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
365         return default_dict.var[i]->name;
366     }
367   return _("<NOVAR>");
368 }
369 #endif
370
371 /* Initializes temp_case from the vectors that say which `value's
372    need to be initialized just once, and which ones need to be
373    re-initialized before every case. */
374 static void
375 vector_initialization (void)
376 {
377   size_t var_cnt = dict_get_var_cnt (default_dict);
378   size_t i;
379   
380   for (i = 0; i < var_cnt; i++) 
381     {
382       struct variable *v = dict_get_var (default_dict, i);
383
384       if (v->type == NUMERIC) 
385         {
386           if (v->reinit)
387             temp_case->data[v->fv].f = 0.0;
388           else
389             temp_case->data[v->fv].f = SYSMIS;
390         }
391       else
392         memset (temp_case->data[v->fv].s, ' ', v->width);
393     }
394 }
395
396 /* Sets all the lag-related variables based on value of n_lag. */
397 static void
398 setup_lag (void)
399 {
400   int i;
401   
402   if (n_lag == 0)
403     return;
404
405   lag_count = 0;
406   lag_head = 0;
407   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
408   for (i = 0; i < n_lag; i++)
409     lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
410 }
411
412 /* There is a lot of potential confusion in the vfm and related
413    routines over the number of `value's at each stage of the process.
414    Here is each nval count, with explanation, as set up by
415    open_active_file():
416
417    vfm_source_info.nval: Number of `value's in the cases returned by
418    the source stream.  This value turns out not to be very useful, but
419    we maintain it anyway.
420
421    vfm_sink_info.nval: Number of `value's in the cases after all
422    transformations have been performed.  Never less than
423    vfm_source_info.nval.
424
425    temp_dict->nval: Number of `value's in the cases after the
426    transformations leading up to TEMPORARY have been performed.  If
427    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
428    Never less than vfm_sink_info.nval.
429
430    compaction_nval: Number of `value's in the cases after the
431    transformations leading up to TEMPORARY have been performed and the
432    case has been compacted by compact_case(), if compaction is
433    necessary.  This the number of `value's in the cases saved by the
434    sink stream.  (However, note that the cases passed to the sink
435    stream have not yet been compacted.  It is the responsibility of
436    the data sink to call compact_case().)  This may be less than,
437    greater than, or equal to vfm_source_info.nval.  `compaction'
438    becomes the new value of default_dict.nval after the procedure is
439    completed.
440
441    default_dict.nval: This is often an alias for temp_dict->nval.  As
442    such it can really have no separate existence until the procedure
443    is complete.  For this reason it should *not* be referenced inside
444    the execution of a procedure. */
445 /* Makes all preparations for reading from the data source and writing
446    to the data sink. */
447 static void
448 open_active_file (void)
449 {
450   /* Sometimes we want to refer to the dictionary that applies to the
451      data actually written to the sink.  This is either temp_dict or
452      default_dict.  However, if TEMPORARY is not on, then temp_dict
453      does not apply.  So, we can set temp_dict to default_dict in this
454      case. */
455   if (!temporary)
456     {
457       temp_trns = n_trns;
458       temp_dict = default_dict;
459     }
460
461   /* No cases passed to the procedure yet. */
462   case_count = 0;
463
464   /* The rest. */
465   prepare_for_writing ();
466   arrange_compaction ();
467   make_temp_case ();
468   vector_initialization ();
469   discard_ctl_stack ();
470   setup_lag ();
471
472   /* Debug output. */
473   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
474                  vfm_source->name, vfm_sink->name));
475   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
476                  "temp_dict->nval=%d, compaction_nval=%d, "
477                  "default_dict.nval=%d\n",
478                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
479                  compaction_nval, default_dict.nval));
480 }
481 \f
482 /* Closes the active file. */
483 static void
484 close_active_file (struct write_case_data *data)
485 {
486   /* Close the current case group. */
487   if (case_count && data->endfunc != NULL)
488     data->endfunc (data->aux);
489
490   /* Stop lagging (catch up?). */
491   if (n_lag)
492     {
493       int i;
494       
495       for (i = 0; i < n_lag; i++)
496         free (lag_queue[i]);
497       free (lag_queue);
498       n_lag = 0;
499     }
500   
501   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
502      off TEMPORARY. */
503   if (temporary)
504     {
505       dict_destroy (default_dict);
506       default_dict = temp_dict;
507       temp_dict = NULL;
508     }
509
510   /* Finish compaction. */
511   if (compaction_necessary)
512     finish_compaction ();
513     
514   /* Old data sink --> New data source. */
515   if (vfm_source && vfm_source->destroy_source)
516     vfm_source->destroy_source ();
517   
518   vfm_source = vfm_sink;
519   vfm_source_info.ncases = vfm_sink_info.ncases;
520   vfm_source_info.nval = compaction_nval;
521   vfm_source_info.case_size = (sizeof (struct ccase)
522                                + (compaction_nval - 1) * sizeof (union value));
523   if (vfm_source->mode)
524     vfm_source->mode ();
525
526   /* Old data sink is gone now. */
527   vfm_sink = NULL;
528
529   /* Cancel TEMPORARY. */
530   cancel_temporary ();
531
532   /* Free temporary cases. */
533   free (temp_case);
534   temp_case = NULL;
535
536   free (compaction_case);
537   compaction_case = NULL;
538
539   /* Cancel PROCESS IF. */
540   expr_free (process_if_expr);
541   process_if_expr = NULL;
542
543   /* Cancel FILTER if temporary. */
544   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
545     dict_set_filter (default_dict, NULL);
546
547   /* Cancel transformations. */
548   cancel_transformations ();
549
550   /* Turn off case limiter. */
551   dict_set_case_limit (default_dict, 0);
552
553   /* Clear VECTOR vectors. */
554   dict_clear_vectors (default_dict);
555
556   debug_printf (("vfm: procedure complete\n\n"));
557 }
558 \f
559 /* Disk case stream. */
560
561 /* Associated files. */
562 FILE *disk_source_file;
563 FILE *disk_sink_file;
564
565 /* Initializes the disk sink. */
566 static void
567 disk_stream_init (void)
568 {
569   disk_sink_file = tmpfile ();
570   if (!disk_sink_file)
571     {
572       msg (ME, _("An error occurred attempting to create a temporary "
573                  "file for use as the active file: %s."),
574            strerror (errno));
575       err_failure ();
576     }
577 }
578
579 /* Reads all cases from the disk source and passes them one by one to
580    write_case(). */
581 static void
582 disk_stream_read (write_case_func *write_case, write_case_data wc_data)
583 {
584   int i;
585
586   for (i = 0; i < vfm_source_info.ncases; i++)
587     {
588       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
589         {
590           msg (ME, _("An error occurred while attempting to read from "
591                "a temporary file created for the active file: %s."),
592                strerror (errno));
593           err_failure ();
594           return;
595         }
596
597       if (!write_case (wc_data))
598         return;
599     }
600 }
601
602 /* Writes temp_case to the disk sink. */
603 static void
604 disk_stream_write (void)
605 {
606   union value *src_case;
607
608   if (compaction_necessary)
609     {
610       compact_case (compaction_case, temp_case);
611       src_case = (union value *) compaction_case;
612     }
613   else src_case = (union value *) temp_case;
614
615   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
616               disk_sink_file) != 1)
617     {
618       msg (ME, _("An error occurred while attempting to write to a "
619                  "temporary file used as the active file: %s."),
620            strerror (errno));
621       err_failure ();
622     }
623 }
624
625 /* Switches the stream from a sink to a source. */
626 static void
627 disk_stream_mode (void)
628 {
629   /* Rewind the sink. */
630   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
631     {
632       msg (ME, _("An error occurred while attempting to rewind a "
633                  "temporary file used as the active file: %s."),
634            strerror (errno));
635       err_failure ();
636     }
637   
638   /* Sink --> source variables. */
639   disk_source_file = disk_sink_file;
640 }
641
642 /* Destroys the source's internal data. */
643 static void
644 disk_stream_destroy_source (void)
645 {
646   if (disk_source_file)
647     {
648       fclose (disk_source_file);
649       disk_source_file = NULL;
650     }
651 }
652
653 /* Destroys the sink's internal data. */
654 static void
655 disk_stream_destroy_sink (void)
656 {
657   if (disk_sink_file)
658     {
659       fclose (disk_sink_file);
660       disk_sink_file = NULL;
661     }
662 }
663
664 /* Disk stream. */
665 struct case_stream vfm_disk_stream = 
666   {
667     disk_stream_init,
668     disk_stream_read,
669     disk_stream_write,
670     disk_stream_mode,
671     disk_stream_destroy_source,
672     disk_stream_destroy_sink,
673     "disk",
674   };
675 \f
676 /* Memory case stream. */
677
678 /* List of cases stored in the stream. */
679 struct case_list *memory_source_cases;
680 struct case_list *memory_sink_cases;
681
682 /* Current case. */
683 struct case_list *memory_sink_iter;
684
685 /* Maximum number of cases. */
686 int memory_sink_max_cases;
687
688 /* Initializes the memory stream variables for writing. */
689 static void
690 memory_stream_init (void)
691 {
692   memory_sink_cases = NULL;
693   memory_sink_iter = NULL;
694   
695   assert (compaction_nval);
696   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
697 }
698
699 /* Reads the case stream from memory and passes it to write_case(). */
700 static void
701 memory_stream_read (write_case_func *write_case, write_case_data wc_data)
702 {
703   while (memory_source_cases != NULL)
704     {
705       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
706       
707       {
708         struct case_list *current = memory_source_cases;
709         memory_source_cases = memory_source_cases->next;
710         free (current);
711       }
712       
713       if (!write_case (wc_data))
714         return;
715     }
716 }
717
718 /* Writes temp_case to the memory stream. */
719 static void
720 memory_stream_write (void)
721 {
722   struct case_list *new_case = malloc (sizeof (struct case_list)
723                                        + ((compaction_nval - 1)
724                                           * sizeof (union value)));
725
726   /* If we've got memory to spare then add it to the linked list. */
727   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
728     {
729       if (compaction_necessary)
730         compact_case (&new_case->c, temp_case);
731       else
732         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
733
734       /* Append case to linked list. */
735       if (memory_sink_cases)
736         memory_sink_iter = memory_sink_iter->next = new_case;
737       else
738         memory_sink_iter = memory_sink_cases = new_case;
739     }
740   else
741     {
742       /* Out of memory.  Write the active file to disk. */
743       struct case_list *cur, *next;
744
745       /* Notify the user. */
746       if (!new_case)
747         msg (MW, _("Virtual memory exhausted.  Paging active file "
748                    "to disk."));
749       else
750         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
751                    "overflowed.  Paging active file to disk."),
752              MAX_WORKSPACE / 1024, memory_sink_max_cases,
753              compaction_nval * sizeof (union value));
754
755       free (new_case);
756
757       /* Switch to a disk sink. */
758       vfm_sink = &vfm_disk_stream;
759       vfm_sink->init ();
760       paging = 1;
761
762       /* Terminate the list. */
763       if (memory_sink_iter)
764         memory_sink_iter->next = NULL;
765
766       /* Write the cases to disk and destroy them.  We can't call
767          vfm->sink->write() because of compaction. */
768       for (cur = memory_sink_cases; cur; cur = next)
769         {
770           next = cur->next;
771           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
772                       disk_sink_file) != 1)
773             {
774               msg (ME, _("An error occurred while attempting to "
775                          "write to a temporary file created as the "
776                          "active file, while paging to disk: %s."),
777                    strerror (errno));
778               err_failure ();
779             }
780           free (cur);
781         }
782
783       /* Write the current case to disk. */
784       vfm_sink->write ();
785     }
786 }
787
788 /* If the data is stored in memory, causes it to be written to disk.
789    To be called only *between* procedure()s, not within them. */
790 void
791 page_to_disk (void)
792 {
793   if (vfm_source == &vfm_memory_stream)
794     {
795       /* Switch to a disk sink. */
796       vfm_sink = &vfm_disk_stream;
797       vfm_sink->init ();
798       paging = 1;
799       
800       /* Write the cases to disk and destroy them.  We can't call
801          vfm->sink->write() because of compaction. */
802       {
803         struct case_list *cur, *next;
804         
805         for (cur = memory_source_cases; cur; cur = next)
806           {
807             next = cur->next;
808             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
809                         disk_sink_file) != 1)
810               {
811                 msg (ME, _("An error occurred while attempting to "
812                            "write to a temporary file created as the "
813                            "active file, while paging to disk: %s."),
814                      strerror (errno));
815                 err_failure ();
816               }
817             free (cur);
818           }
819       }
820       
821       vfm_source = &vfm_disk_stream;
822       vfm_source->mode ();
823
824       vfm_sink = NULL;
825     }
826 }
827
828 /* Switch the memory stream from sink to source mode. */
829 static void
830 memory_stream_mode (void)
831 {
832   /* Terminate the list. */
833   if (memory_sink_iter)
834     memory_sink_iter->next = NULL;
835
836   /* Sink --> source variables. */
837   memory_source_cases = memory_sink_cases;
838   memory_sink_cases = NULL;
839 }
840
841 /* Destroy all memory source data. */
842 static void
843 memory_stream_destroy_source (void)
844 {
845   struct case_list *cur, *next;
846   
847   for (cur = memory_source_cases; cur; cur = next)
848     {
849       next = cur->next;
850       free (cur);
851     }
852   memory_source_cases = NULL;
853 }
854
855 /* Destroy all memory sink data. */
856 static void
857 memory_stream_destroy_sink (void)
858 {
859   struct case_list *cur, *next;
860   
861   for (cur = memory_sink_cases; cur; cur = next)
862     {
863       next = cur->next;
864       free (cur);
865     }
866   memory_sink_cases = NULL;
867 }
868   
869 /* Memory stream. */
870 struct case_stream vfm_memory_stream = 
871   {
872     memory_stream_init,
873     memory_stream_read,
874     memory_stream_write,
875     memory_stream_mode,
876     memory_stream_destroy_source,
877     memory_stream_destroy_sink,
878     "memory",
879   };
880 \f
881 #include "debug-print.h"
882
883 /* Add temp_case to the lag queue. */
884 static void
885 lag_case (void)
886 {
887   if (lag_count < n_lag)
888     lag_count++;
889   memcpy (lag_queue[lag_head], temp_case,
890           dict_get_case_size (temp_dict));
891   if (++lag_head >= n_lag)
892     lag_head = 0;
893 }
894
895 /* Returns a pointer to the lagged case from N_BEFORE cases before the
896    current one, or NULL if there haven't been that many cases yet. */
897 struct ccase *
898 lagged_case (int n_before)
899 {
900   assert (n_before <= n_lag);
901   if (n_before > lag_count)
902     return NULL;
903   
904   {
905     int index = lag_head - n_before;
906     if (index < 0)
907       index += n_lag;
908     return lag_queue[index];
909   }
910 }
911    
912 /* Transforms temp_case and writes it to the replacement active file
913    if advisable.  Returns nonzero if more cases can be accepted, zero
914    otherwise.  Do not call this function again after it has returned
915    zero once.  */
916 int
917 procedure_write_case (write_case_data wc_data)
918 {
919   /* Index of current transformation. */
920   int cur_trns;
921
922   /* Return value: whether it's reasonable to write any more cases. */
923   int more_cases = 1;
924
925   debug_printf ((_("transform: ")));
926
927   cur_trns = f_trns;
928   for (;;)
929     {
930       /* Output the case if this is temp_trns. */
931       if (cur_trns == temp_trns)
932         {
933           debug_printf (("REC"));
934
935           if (n_lag)
936             lag_case ();
937           
938           vfm_sink_info.ncases++;
939           vfm_sink->write ();
940
941           if (dict_get_case_limit (default_dict))
942             more_cases = (vfm_sink_info.ncases
943                           < dict_get_case_limit (default_dict));
944         }
945
946       /* Are we done? */
947       if (cur_trns >= n_trns)
948         break;
949       
950       debug_printf (("$%d", cur_trns));
951
952       /* Decide which transformation should come next. */
953       {
954         int code;
955         
956         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
957         switch (code)
958           {
959           case -1:
960             /* Next transformation. */
961             cur_trns++;
962             break;
963           case -2:
964             /* Delete this case. */
965             goto done;
966           default:
967             /* Go to that transformation. */
968             cur_trns = code;
969             break;
970           }
971       }
972     }
973
974   /* Call the beginning of group function. */
975   if (!case_count && wc_data->beginfunc != NULL)
976     wc_data->beginfunc (wc_data->aux);
977
978   /* Call the procedure if there is one and FILTER and PROCESS IF
979      don't prohibit it. */
980   if (wc_data->procfunc != NULL && !exclude_this_case ())
981     wc_data->procfunc (temp_case, wc_data->aux);
982
983   case_count++;
984   
985 done:
986   debug_putc ('\n', stdout);
987
988   clear_temp_case ();
989   
990   /* Return previously determined value. */
991   return more_cases;
992 }
993
994 /* Clears the variables in the temporary case that need to be
995    cleared between processing cases.  */
996 static void
997 clear_temp_case (void)
998 {
999   /* FIXME?  This is linear in the number of variables, but
1000      doesn't need to be, so it's an easy optimization target. */
1001   size_t var_cnt = dict_get_var_cnt (default_dict);
1002   size_t i;
1003   
1004   for (i = 0; i < var_cnt; i++) 
1005     {
1006       struct variable *v = dict_get_var (default_dict, i);
1007       if (v->init && v->reinit) 
1008         {
1009           if (v->type == NUMERIC) 
1010             temp_case->data[v->fv].f = SYSMIS;
1011           else
1012             memset (temp_case->data[v->fv].s, ' ', v->width);
1013         } 
1014     }
1015 }
1016
1017 /* Returns nonzero if this case should be exclude as specified on
1018    FILTER or PROCESS IF, otherwise zero. */
1019 static int
1020 exclude_this_case (void)
1021 {
1022   /* FILTER. */
1023   struct variable *filter_var = dict_get_filter (default_dict);
1024   if (filter_var != NULL) 
1025     {
1026       double f = temp_case->data[filter_var->fv].f;
1027       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
1028         return 1;
1029     }
1030
1031   /* PROCESS IF. */
1032   if (process_if_expr != NULL
1033       && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0)
1034     return 1;
1035
1036   return 0;
1037 }
1038
1039 /* Appends TRNS to t_trns[], the list of all transformations to be
1040    performed on data as it is read from the active file. */
1041 void
1042 add_transformation (struct trns_header * trns)
1043 {
1044   if (n_trns >= m_trns)
1045     {
1046       m_trns += 16;
1047       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1048     }
1049   t_trns[n_trns] = trns;
1050   trns->index = n_trns++;
1051 }
1052
1053 /* Cancels all active transformations, including any transformations
1054    created by the input program. */
1055 void
1056 cancel_transformations (void)
1057 {
1058   int i;
1059   for (i = 0; i < n_trns; i++)
1060     {
1061       if (t_trns[i]->free)
1062         t_trns[i]->free (t_trns[i]);
1063       free (t_trns[i]);
1064     }
1065   n_trns = f_trns = 0;
1066   if (m_trns > 32)
1067     {
1068       free (t_trns);
1069       m_trns = 0;
1070     }
1071 }
1072
1073 /* Dumps out the values of all the split variables for the case C. */
1074 static void
1075 dump_splits (struct ccase *c)
1076 {
1077   struct variable *const *split;
1078   struct tab_table *t;
1079   size_t split_cnt;
1080   int i;
1081
1082   split_cnt = dict_get_split_cnt (default_dict);
1083   t = tab_create (3, split_cnt + 1, 0);
1084   tab_dim (t, tab_natural_dimensions);
1085   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1086   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1087   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1088   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1089   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1090   split = dict_get_split_vars (default_dict);
1091   for (i = 0; i < split_cnt; i++)
1092     {
1093       struct variable *v = split[i];
1094       char temp_buf[80];
1095       const char *val_lab;
1096
1097       assert (v->type == NUMERIC || v->type == ALPHA);
1098       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1099       
1100       data_out (temp_buf, &v->print, &c->data[v->fv]);
1101       
1102       temp_buf[v->print.w] = 0;
1103       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1104
1105       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1106       if (val_lab)
1107         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1108     }
1109   tab_flags (t, SOMF_NO_TITLE);
1110   tab_submit (t);
1111 }
1112
1113 /* This procfunc is substituted for the user-supplied procfunc when
1114    SPLIT FILE is active.  This function forms a wrapper around that
1115    procfunc by dividing the input into series. */
1116 static int
1117 SPLIT_FILE_procfunc (struct ccase *c, void *data_)
1118 {
1119   struct write_case_data *data = data_;
1120   static struct ccase *prev_case;
1121   struct variable *const *split;
1122   size_t split_cnt;
1123   size_t i;
1124
1125   /* The first case always begins a new series.  We also need to
1126      preserve the values of the case for later comparison. */
1127   if (case_count == 0)
1128     {
1129       if (prev_case)
1130         free (prev_case);
1131       prev_case = xmalloc (vfm_sink_info.case_size);
1132       memcpy (prev_case, c, vfm_sink_info.case_size);
1133
1134       dump_splits (c);
1135       if (data->beginfunc != NULL)
1136         data->beginfunc (data->aux);
1137       
1138       return data->procfunc (c, data->aux);
1139     }
1140
1141   /* Compare the value of each SPLIT FILE variable to the values on
1142      the previous case. */
1143   split = dict_get_split_vars (default_dict);
1144   split_cnt = dict_get_split_cnt (default_dict);
1145   for (i = 0; i < split_cnt; i++)
1146     {
1147       struct variable *v = split[i];
1148       
1149       switch (v->type)
1150         {
1151         case NUMERIC:
1152           if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
1153             goto not_equal;
1154           break;
1155         case ALPHA:
1156           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1157             goto not_equal;
1158           break;
1159         default:
1160           assert (0);
1161         }
1162     }
1163   return data->procfunc (c, data->aux);
1164   
1165 not_equal:
1166   /* The values of the SPLIT FILE variable are different from the
1167      values on the previous case.  That means that it's time to begin
1168      a new series. */
1169   if (data->endfunc != NULL)
1170     data->endfunc (data->aux);
1171   dump_splits (c);
1172   if (data->beginfunc != NULL)
1173     data->beginfunc (data->aux);
1174   memcpy (prev_case, c, vfm_sink_info.case_size);
1175   return data->procfunc (c, data->aux);
1176 }
1177 \f
1178 /* Case compaction. */
1179
1180 /* Copies case SRC to case DEST, compacting it in the process. */
1181 void
1182 compact_case (struct ccase *dest, const struct ccase *src)
1183 {
1184   int i;
1185   int nval = 0;
1186   size_t var_cnt;
1187   
1188   assert (compaction_necessary);
1189
1190   if (temporary == 2)
1191     {
1192       if (dest != compaction_case)
1193         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1194       return;
1195     }
1196
1197   /* Copy all the variables except the scratch variables from SRC to
1198      DEST. */
1199   var_cnt = dict_get_var_cnt (default_dict);
1200   for (i = 0; i < var_cnt; i++)
1201     {
1202       struct variable *v = dict_get_var (default_dict, i);
1203       
1204       if (v->name[0] == '#')
1205         continue;
1206
1207       if (v->type == NUMERIC)
1208         dest->data[nval++] = src->data[v->fv];
1209       else
1210         {
1211           int w = DIV_RND_UP (v->width, sizeof (union value));
1212           
1213           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1214           nval += w;
1215         }
1216     }
1217 }
1218
1219 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1220 static void
1221 finish_compaction (void)
1222 {
1223   int i;
1224
1225   for (i = 0; i < dict_get_var_cnt (default_dict); )
1226     {
1227       struct variable *v = dict_get_var (default_dict, i);
1228
1229       if (v->name[0] == '#') 
1230         dict_delete_var (default_dict, v);
1231       else
1232         i++;
1233     }
1234   dict_compact_values (default_dict);
1235 }
1236
1237