Ignore more files.
[pspp] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "approx.h"
32 #include "do-ifP.h"
33 #include "error.h"
34 #include "expr.h"
35 #include "misc.h"
36 #include "random.h"
37 #include "som.h"
38 #include "str.h"
39 #include "tab.h"
40 #include "var.h"
41 #include "value-labels.h"
42
43 /*
44    Virtual File Manager (vfm):
45
46    vfm is used to process data files.  It uses the model that data is
47    read from one stream (the data source), then written to another
48    (the data sink).  The data source is then deleted and the data sink
49    becomes the data source for the next procedure. */
50
51 #include "debug-print.h"
52
53 /* Procedure execution data. */
54 struct write_case_data
55   {
56     void (*beginfunc) (void *);
57     int (*procfunc) (struct ccase *, void *);
58     void (*endfunc) (void *);
59     void *aux;
60   };
61
62 /* This is used to read from the active file. */
63 struct case_stream *vfm_source;
64
65 /* `value' indexes to initialize to particular values for certain cases. */
66 struct long_vec reinit_sysmis;          /* SYSMIS for every case. */
67 struct long_vec reinit_blanks;          /* Blanks for every case. */
68 struct long_vec init_zero;              /* Zero for first case only. */
69 struct long_vec init_blanks;            /* Blanks for first case only. */
70
71 /* This is used to write to the replacement active file. */
72 struct case_stream *vfm_sink;
73
74 /* Information about the data source. */
75 struct stream_info vfm_source_info;
76
77 /* Information about the data sink. */
78 struct stream_info vfm_sink_info;
79
80 /* Filter variable and  `value' index. */
81 static struct variable *filter_var;
82 static int filter_index;
83
84 #define FILTERED                                                        \
85         (filter_index != -1                                             \
86          && (temp_case->data[filter_index].f == 0.0                     \
87              || temp_case->data[filter_index].f == SYSMIS               \
88              || is_num_user_missing (temp_case->data[filter_index].f,   \
89                                      filter_var)))
90
91 /* Nonzero if the case needs to have values deleted before being
92    stored, zero otherwise. */
93 int compaction_necessary;
94
95 /* Number of values after compaction, or the same as
96    vfm_sink_info.nval, if compaction is not necessary. */
97 int compaction_nval;
98
99 /* Temporary case buffer with enough room for `compaction_nval'
100    `value's. */
101 struct ccase *compaction_case;
102
103 /* Within a session, when paging is turned on, it is never turned back
104    off.  This policy might be too aggressive. */
105 static int paging = 0;
106
107 /* Time at which vfm was last invoked. */
108 time_t last_vfm_invocation;
109
110 /* Number of cases passed to proc_func(). */
111 static int case_count;
112
113 /* Lag queue. */
114 int n_lag;                      /* Number of cases to lag. */
115 static int lag_count;           /* Number of cases in lag_queue so far. */
116 static int lag_head;            /* Index where next case will be added. */
117 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
118
119 static void open_active_file (void);
120 static void close_active_file (struct write_case_data *);
121 static int SPLIT_FILE_procfunc (struct ccase *, void *);
122 static void finish_compaction (void);
123 static void lag_case (void);
124 static int procedure_write_case (struct write_case_data *);
125 \f
126 /* Public functions. */
127
128 /* Reads all the cases from the active file, transforms them by
129    the active set of transformations, calls PROCFUNC with CURCASE
130    set to the case , and writes them to a new active file.
131
132    Divides the active file into zero or more series of one or more
133    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
134    called after each series.
135
136    Arbitrary user-specified data AUX is passed to BEGINFUNC,
137    PROCFUNC, and ENDFUNC as auxiliary data. */
138 void
139 procedure (void (*beginfunc) (void *),
140            int (*procfunc) (struct ccase *curcase, void *),
141            void (*endfunc) (void *),
142            void *aux)
143 {
144   struct write_case_data procedure_write_data;
145   struct write_case_data split_file_data;
146
147   if (dict_get_split_cnt (default_dict) == 0) 
148     {
149       /* Normally we just use the data passed by the user. */
150       procedure_write_data.beginfunc = beginfunc;
151       procedure_write_data.procfunc = procfunc;
152       procedure_write_data.endfunc = endfunc;
153       procedure_write_data.aux = aux;
154     }
155   else
156     {
157       /* Under SPLIT FILE, we add a layer of indirection. */
158       procedure_write_data.beginfunc = NULL;
159       procedure_write_data.procfunc = SPLIT_FILE_procfunc;
160       procedure_write_data.endfunc = endfunc;
161       procedure_write_data.aux = &split_file_data;
162
163       split_file_data.beginfunc = beginfunc;
164       split_file_data.procfunc = procfunc;
165       split_file_data.endfunc = endfunc;
166       split_file_data.aux = aux;
167     }
168
169   last_vfm_invocation = time (NULL);
170
171   open_active_file ();
172   vfm_source->read (procedure_write_case, &procedure_write_data);
173   close_active_file (&procedure_write_data);
174 }
175 \f
176 /* Active file processing support.  Subtly different semantics from
177    procedure(). */
178
179 static int process_active_file_write_case (struct write_case_data *data);
180
181 /* The casefunc might want us to stop calling it. */
182 static int not_canceled;
183
184 /* Reads all the cases from the active file and passes them one-by-one
185    to CASEFUNC in temp_case.  Before any cases are passed, calls
186    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
187    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
188    file by calling process_active_file_output_case().
189
190    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
191 void
192 process_active_file (void (*beginfunc) (void *),
193                      int (*casefunc) (struct ccase *curcase, void *),
194                      void (*endfunc) (void *),
195                      void *aux)
196 {
197   struct write_case_data process_active_write_data;
198
199   process_active_write_data.beginfunc = beginfunc;
200   process_active_write_data.procfunc = casefunc;
201   process_active_write_data.endfunc = endfunc;
202   process_active_write_data.aux = aux;
203
204   not_canceled = 1;
205
206   open_active_file ();
207   beginfunc (aux);
208   
209   /* There doesn't necessarily need to be an active file. */
210   if (vfm_source)
211     vfm_source->read (process_active_file_write_case,
212                       &process_active_write_data);
213   
214   endfunc (aux);
215   close_active_file (&process_active_write_data);
216 }
217
218 /* Pass the current case to casefunc. */
219 static int
220 process_active_file_write_case (struct write_case_data *data)
221 {
222   /* Index of current transformation. */
223   int cur_trns;
224
225   for (cur_trns = f_trns ; cur_trns != temp_trns; )
226     {
227       int code;
228         
229       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
230       switch (code)
231         {
232         case -1:
233           /* Next transformation. */
234           cur_trns++;
235           break;
236         case -2:
237           /* Delete this case. */
238           goto done;
239         default:
240           /* Go to that transformation. */
241           cur_trns = code;
242           break;
243         }
244     }
245
246   if (n_lag)
247     lag_case ();
248           
249   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
250   if (not_canceled
251       && !FILTERED
252       && (process_if_expr == NULL ||
253           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
254     not_canceled = data->procfunc (temp_case, data->aux);
255   
256   case_count++;
257   
258  done:
259   {
260     long *lp;
261
262     /* This case is finished.  Initialize the variables for the next case. */
263     for (lp = reinit_sysmis.vec; *lp != -1;)
264       temp_case->data[*lp++].f = SYSMIS;
265     for (lp = reinit_blanks.vec; *lp != -1;)
266       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
267   }
268   
269   return 1;
270 }
271
272 /* Write temp_case to the active file. */
273 void
274 process_active_file_output_case (void)
275 {
276   vfm_sink_info.ncases++;
277   vfm_sink->write ();
278 }
279 \f
280 /* Opening the active file. */
281
282 /* It might be usefully noted that the following several functions are
283    given in the order that they are called by open_active_file(). */
284
285 /* Prepare to write to the replacement active file. */
286 static void
287 prepare_for_writing (void)
288 {
289   /* FIXME: If ALL the conditions listed below hold true, then the
290      replacement active file is guaranteed to be identical to the
291      original active file:
292
293      1. TEMPORARY was the first transformation, OR, there were no
294      transformations at all.
295
296      2. Input is not coming from an input program.
297
298      3. Compaction is not necessary.
299
300      So, in this case, we shouldn't have to replace the active
301      file--it's just a waste of time and space. */
302
303   vfm_sink_info.ncases = 0;
304   vfm_sink_info.nval = dict_get_value_cnt (default_dict);
305   vfm_sink_info.case_size = (sizeof (struct ccase)
306                              + ((dict_get_value_cnt (default_dict) - 1)
307                                 * sizeof (union value)));
308   
309   if (vfm_sink == NULL)
310     {
311       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
312           && !paging)
313         {
314           msg (MW, _("Workspace overflow predicted.  Max workspace is "
315                      "currently set to %d KB (%d cases at %d bytes each).  "
316                      "Paging active file to disk."),
317                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
318                vfm_sink_info.case_size);
319           
320           paging = 1;
321         }
322       
323       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
324     }
325 }
326
327 /* Arrange for compacting the output cases for storage. */
328 static void
329 arrange_compaction (void)
330 {
331   int count_values = 0;
332
333   {
334     int i;
335     
336     /* Count up the number of `value's that will be output. */
337     for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
338       {
339         struct variable *v = dict_get_var (temp_dict, i);
340
341         if (v->name[0] != '#')
342           {
343             assert (v->nv > 0);
344             count_values += v->nv;
345           } 
346       }
347     assert (temporary == 2 || count_values <= dict_get_value_cnt (temp_dict));
348   }
349   
350   /* Compaction is only necessary if the number of `value's to output
351      differs from the number already present. */
352   compaction_nval = count_values;
353   compaction_necessary = (temporary == 2
354                           || count_values != dict_get_value_cnt (temp_dict));
355   
356   if (vfm_sink->init)
357     vfm_sink->init ();
358 }
359
360 /* Prepares the temporary case and compaction case. */
361 static void
362 make_temp_case (void)
363 {
364   temp_case = xmalloc (vfm_sink_info.case_size);
365
366   if (compaction_necessary)
367     compaction_case = xmalloc (sizeof (struct ccase)
368                                + sizeof (union value) * (compaction_nval - 1));
369 }
370
371 #if DEBUGGING
372 /* Returns the name of the variable that owns the index CCASE_INDEX
373    into ccase. */
374 static const char *
375 index_to_varname (int ccase_index)
376 {
377   int i;
378
379   for (i = 0; i < default_dict.nvar; i++)
380     {
381       struct variable *v = default_dict.var[i];
382       
383       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
384         return default_dict.var[i]->name;
385     }
386   return _("<NOVAR>");
387 }
388 #endif
389
390 /* Initializes temp_case from the vectors that say which `value's need
391    to be initialized just once, and which ones need to be
392    re-initialized before every case. */
393 static void
394 vector_initialization (void)
395 {
396   int i;
397   long *lp;
398   
399   /* Just once. */
400   for (i = 0; i < init_zero.n; i++)
401     temp_case->data[init_zero.vec[i]].f = 0.0;
402   for (i = 0; i < init_blanks.n; i++)
403     memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING);
404
405   /* These vectors need to be repeatedly accessed, so we add a
406      sentinel to (hopefully) improve speed. */
407   vec_insert (&reinit_sysmis, -1);
408   vec_insert (&reinit_blanks, -1);
409
410   for (lp = reinit_sysmis.vec; *lp != -1;)
411     temp_case->data[*lp++].f = SYSMIS;
412   for (lp = reinit_blanks.vec; *lp != -1;)
413     memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
414   
415 #if DEBUGGING
416   printf ("vfm: init_zero=");
417   for (i = 0; i < init_zero.n; i++)
418     printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i]));
419   printf (" init_blanks=");
420   for (i = 0; i < init_blanks.n; i++)
421     printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i]));
422   printf (" reinit_sysmis=");
423   for (lp = reinit_sysmis.vec; *lp != -1; lp++)
424     printf ("%s%s", lp != reinit_sysmis.vec ? "," : "",
425             index_to_varname (*lp));
426   printf (" reinit_blanks=");
427   for (lp = reinit_blanks.vec; *lp != -1; lp++)
428     printf ("%s%s", lp != reinit_blanks.vec ? "," : "",
429             index_to_varname (*lp));
430   printf ("\n");
431 #endif
432 }
433
434 /* Sets filter_index to an appropriate value. */
435 static void
436 setup_filter (void)
437 {
438   filter_var = dict_get_filter (default_dict);
439   
440   if (filter_var != NULL)
441     {
442       assert (filter_var->type == NUMERIC);
443       filter_index = filter_var->index;
444     } else {
445       filter_index = -1;
446     }
447 }
448
449 /* Sets all the lag-related variables based on value of n_lag. */
450 static void
451 setup_lag (void)
452 {
453   int i;
454   
455   if (n_lag == 0)
456     return;
457
458   lag_count = 0;
459   lag_head = 0;
460   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
461   for (i = 0; i < n_lag; i++)
462     lag_queue[i] = xmalloc (dict_get_value_cnt (temp_dict)
463                             * sizeof **lag_queue);
464 }
465
466 /* There is a lot of potential confusion in the vfm and related
467    routines over the number of `value's at each stage of the process.
468    Here is each nval count, with explanation, as set up by
469    open_active_file():
470
471    vfm_source_info.nval: Number of `value's in the cases returned by
472    the source stream.  This value turns out not to be very useful, but
473    we maintain it anyway.
474
475    vfm_sink_info.nval: Number of `value's in the cases after all
476    transformations have been performed.  Never less than
477    vfm_source_info.nval.
478
479    temp_dict->nval: Number of `value's in the cases after the
480    transformations leading up to TEMPORARY have been performed.  If
481    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
482    Never less than vfm_sink_info.nval.
483
484    compaction_nval: Number of `value's in the cases after the
485    transformations leading up to TEMPORARY have been performed and the
486    case has been compacted by compact_case(), if compaction is
487    necessary.  This the number of `value's in the cases saved by the
488    sink stream.  (However, note that the cases passed to the sink
489    stream have not yet been compacted.  It is the responsibility of
490    the data sink to call compact_case().)  This may be less than,
491    greater than, or equal to vfm_source_info.nval.  `compaction'
492    becomes the new value of default_dict.nval after the procedure is
493    completed.
494
495    default_dict.nval: This is often an alias for temp_dict->nval.  As
496    such it can really have no separate existence until the procedure
497    is complete.  For this reason it should *not* be referenced inside
498    the execution of a procedure. */
499 /* Makes all preparations for reading from the data source and writing
500    to the data sink. */
501 static void
502 open_active_file (void)
503 {
504   /* Sometimes we want to refer to the dictionary that applies to the
505      data actually written to the sink.  This is either temp_dict or
506      default_dict.  However, if TEMPORARY is not on, then temp_dict
507      does not apply.  So, we can set temp_dict to default_dict in this
508      case. */
509   if (!temporary)
510     {
511       temp_trns = n_trns;
512       temp_dict = default_dict;
513     }
514
515   /* No cases passed to the procedure yet. */
516   case_count = 0;
517
518   /* The rest. */
519   prepare_for_writing ();
520   arrange_compaction ();
521   make_temp_case ();
522   vector_initialization ();
523   discard_ctl_stack ();
524   setup_filter ();
525   setup_lag ();
526
527   /* Debug output. */
528   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
529                  vfm_source->name, vfm_sink->name));
530   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
531                  "temp_dict->nval=%d, compaction_nval=%d, "
532                  "default_dict.nval=%d\n",
533                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
534                  compaction_nval, default_dict.nval));
535 }
536 \f
537 /* Closes the active file. */
538 static void
539 close_active_file (struct write_case_data *data)
540 {
541   /* Close the current case group. */
542   if (case_count && data->endfunc != NULL)
543     data->endfunc (data->aux);
544
545   /* Stop lagging (catch up?). */
546   if (n_lag)
547     {
548       int i;
549       
550       for (i = 0; i < n_lag; i++)
551         free (lag_queue[i]);
552       free (lag_queue);
553       n_lag = 0;
554     }
555   
556   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
557      off TEMPORARY. */
558   if (temporary)
559     {
560       dict_destroy (default_dict);
561       default_dict = temp_dict;
562       temp_dict = NULL;
563     }
564
565   /* Finish compaction. */
566   if (compaction_necessary)
567     finish_compaction ();
568     
569   /* Old data sink --> New data source. */
570   if (vfm_source && vfm_source->destroy_source)
571     vfm_source->destroy_source ();
572   
573   vfm_source = vfm_sink;
574   vfm_source_info.ncases = vfm_sink_info.ncases;
575   vfm_source_info.nval = compaction_nval;
576   vfm_source_info.case_size = (sizeof (struct ccase)
577                                + (compaction_nval - 1) * sizeof (union value));
578   if (vfm_source->mode)
579     vfm_source->mode ();
580
581   /* Old data sink is gone now. */
582   vfm_sink = NULL;
583
584   /* Cancel TEMPORARY. */
585   cancel_temporary ();
586
587   /* Free temporary cases. */
588   free (temp_case);
589   temp_case = NULL;
590
591   free (compaction_case);
592   compaction_case = NULL;
593
594   /* Cancel PROCESS IF. */
595   expr_free (process_if_expr);
596   process_if_expr = NULL;
597
598   /* Cancel FILTER if temporary. */
599   if (filter_var != NULL && !FILTER_before_TEMPORARY)
600     dict_set_filter (default_dict, NULL);
601
602   /* Cancel transformations. */
603   cancel_transformations ();
604
605   /* Clear value-initialization vectors. */
606   vec_clear (&init_zero);
607   vec_clear (&init_blanks);
608   vec_clear (&reinit_sysmis);
609   vec_clear (&reinit_blanks);
610
611   /* Turn off case limiter. */
612   dict_set_case_limit (default_dict, 0);
613
614   /* Clear VECTOR vectors. */
615   dict_clear_vectors (default_dict);
616
617   debug_printf (("vfm: procedure complete\n\n"));
618 }
619 \f
620 /* Disk case stream. */
621
622 /* Associated files. */
623 FILE *disk_source_file;
624 FILE *disk_sink_file;
625
626 /* Initializes the disk sink. */
627 static void
628 disk_stream_init (void)
629 {
630   disk_sink_file = tmpfile ();
631   if (!disk_sink_file)
632     {
633       msg (ME, _("An error occurred attempting to create a temporary "
634                  "file for use as the active file: %s."),
635            strerror (errno));
636       err_failure ();
637     }
638 }
639
640 /* Reads all cases from the disk source and passes them one by one to
641    write_case(). */
642 static void
643 disk_stream_read (write_case_func *write_case, write_case_data wc_data)
644 {
645   int i;
646
647   for (i = 0; i < vfm_source_info.ncases; i++)
648     {
649       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
650         {
651           msg (ME, _("An error occurred while attempting to read from "
652                "a temporary file created for the active file: %s."),
653                strerror (errno));
654           err_failure ();
655           return;
656         }
657
658       if (!write_case (wc_data))
659         return;
660     }
661 }
662
663 /* Writes temp_case to the disk sink. */
664 static void
665 disk_stream_write (void)
666 {
667   union value *src_case;
668
669   if (compaction_necessary)
670     {
671       compact_case (compaction_case, temp_case);
672       src_case = (union value *) compaction_case;
673     }
674   else src_case = (union value *) temp_case;
675
676   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
677               disk_sink_file) != 1)
678     {
679       msg (ME, _("An error occurred while attempting to write to a "
680                  "temporary file used as the active file: %s."),
681            strerror (errno));
682       err_failure ();
683     }
684 }
685
686 /* Switches the stream from a sink to a source. */
687 static void
688 disk_stream_mode (void)
689 {
690   /* Rewind the sink. */
691   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
692     {
693       msg (ME, _("An error occurred while attempting to rewind a "
694                  "temporary file used as the active file: %s."),
695            strerror (errno));
696       err_failure ();
697     }
698   
699   /* Sink --> source variables. */
700   disk_source_file = disk_sink_file;
701 }
702
703 /* Destroys the source's internal data. */
704 static void
705 disk_stream_destroy_source (void)
706 {
707   if (disk_source_file)
708     {
709       fclose (disk_source_file);
710       disk_source_file = NULL;
711     }
712 }
713
714 /* Destroys the sink's internal data. */
715 static void
716 disk_stream_destroy_sink (void)
717 {
718   if (disk_sink_file)
719     {
720       fclose (disk_sink_file);
721       disk_sink_file = NULL;
722     }
723 }
724
725 /* Disk stream. */
726 struct case_stream vfm_disk_stream = 
727   {
728     disk_stream_init,
729     disk_stream_read,
730     disk_stream_write,
731     disk_stream_mode,
732     disk_stream_destroy_source,
733     disk_stream_destroy_sink,
734     "disk",
735   };
736 \f
737 /* Memory case stream. */
738
739 /* List of cases stored in the stream. */
740 struct case_list *memory_source_cases;
741 struct case_list *memory_sink_cases;
742
743 /* Current case. */
744 struct case_list *memory_sink_iter;
745
746 /* Maximum number of cases. */
747 int memory_sink_max_cases;
748
749 /* Initializes the memory stream variables for writing. */
750 static void
751 memory_stream_init (void)
752 {
753   memory_sink_cases = NULL;
754   memory_sink_iter = NULL;
755   
756   assert (compaction_nval);
757   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
758 }
759
760 /* Reads the case stream from memory and passes it to write_case(). */
761 static void
762 memory_stream_read (write_case_func *write_case, write_case_data wc_data)
763 {
764   while (memory_source_cases != NULL)
765     {
766       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
767       
768       {
769         struct case_list *current = memory_source_cases;
770         memory_source_cases = memory_source_cases->next;
771         free (current);
772       }
773       
774       if (!write_case (wc_data))
775         return;
776     }
777 }
778
779 /* Writes temp_case to the memory stream. */
780 static void
781 memory_stream_write (void)
782 {
783   struct case_list *new_case = malloc (sizeof (struct case_list)
784                                        + ((compaction_nval - 1)
785                                           * sizeof (union value)));
786
787   /* If we've got memory to spare then add it to the linked list. */
788   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
789     {
790       if (compaction_necessary)
791         compact_case (&new_case->c, temp_case);
792       else
793         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
794
795       /* Append case to linked list. */
796       if (memory_sink_cases)
797         memory_sink_iter = memory_sink_iter->next = new_case;
798       else
799         memory_sink_iter = memory_sink_cases = new_case;
800     }
801   else
802     {
803       /* Out of memory.  Write the active file to disk. */
804       struct case_list *cur, *next;
805
806       /* Notify the user. */
807       if (!new_case)
808         msg (MW, _("Virtual memory exhausted.  Paging active file "
809                    "to disk."));
810       else
811         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
812                    "overflowed.  Paging active file to disk."),
813              MAX_WORKSPACE / 1024, memory_sink_max_cases,
814              compaction_nval * sizeof (union value));
815
816       free (new_case);
817
818       /* Switch to a disk sink. */
819       vfm_sink = &vfm_disk_stream;
820       vfm_sink->init ();
821       paging = 1;
822
823       /* Terminate the list. */
824       if (memory_sink_iter)
825         memory_sink_iter->next = NULL;
826
827       /* Write the cases to disk and destroy them.  We can't call
828          vfm->sink->write() because of compaction. */
829       for (cur = memory_sink_cases; cur; cur = next)
830         {
831           next = cur->next;
832           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
833                       disk_sink_file) != 1)
834             {
835               msg (ME, _("An error occurred while attempting to "
836                          "write to a temporary file created as the "
837                          "active file, while paging to disk: %s."),
838                    strerror (errno));
839               err_failure ();
840             }
841           free (cur);
842         }
843
844       /* Write the current case to disk. */
845       vfm_sink->write ();
846     }
847 }
848
849 /* If the data is stored in memory, causes it to be written to disk.
850    To be called only *between* procedure()s, not within them. */
851 void
852 page_to_disk (void)
853 {
854   if (vfm_source == &vfm_memory_stream)
855     {
856       /* Switch to a disk sink. */
857       vfm_sink = &vfm_disk_stream;
858       vfm_sink->init ();
859       paging = 1;
860       
861       /* Write the cases to disk and destroy them.  We can't call
862          vfm->sink->write() because of compaction. */
863       {
864         struct case_list *cur, *next;
865         
866         for (cur = memory_source_cases; cur; cur = next)
867           {
868             next = cur->next;
869             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
870                         disk_sink_file) != 1)
871               {
872                 msg (ME, _("An error occurred while attempting to "
873                            "write to a temporary file created as the "
874                            "active file, while paging to disk: %s."),
875                      strerror (errno));
876                 err_failure ();
877               }
878             free (cur);
879           }
880       }
881       
882       vfm_source = &vfm_disk_stream;
883       vfm_source->mode ();
884
885       vfm_sink = NULL;
886     }
887 }
888
889 /* Switch the memory stream from sink to source mode. */
890 static void
891 memory_stream_mode (void)
892 {
893   /* Terminate the list. */
894   if (memory_sink_iter)
895     memory_sink_iter->next = NULL;
896
897   /* Sink --> source variables. */
898   memory_source_cases = memory_sink_cases;
899   memory_sink_cases = NULL;
900 }
901
902 /* Destroy all memory source data. */
903 static void
904 memory_stream_destroy_source (void)
905 {
906   struct case_list *cur, *next;
907   
908   for (cur = memory_source_cases; cur; cur = next)
909     {
910       next = cur->next;
911       free (cur);
912     }
913   memory_source_cases = NULL;
914 }
915
916 /* Destroy all memory sink data. */
917 static void
918 memory_stream_destroy_sink (void)
919 {
920   struct case_list *cur, *next;
921   
922   for (cur = memory_sink_cases; cur; cur = next)
923     {
924       next = cur->next;
925       free (cur);
926     }
927   memory_sink_cases = NULL;
928 }
929   
930 /* Memory stream. */
931 struct case_stream vfm_memory_stream = 
932   {
933     memory_stream_init,
934     memory_stream_read,
935     memory_stream_write,
936     memory_stream_mode,
937     memory_stream_destroy_source,
938     memory_stream_destroy_sink,
939     "memory",
940   };
941 \f
942 #include "debug-print.h"
943
944 /* Add temp_case to the lag queue. */
945 static void
946 lag_case (void)
947 {
948   if (lag_count < n_lag)
949     lag_count++;
950   memcpy (lag_queue[lag_head], temp_case,
951           sizeof (union value) * dict_get_value_cnt (temp_dict));
952   if (++lag_head >= n_lag)
953     lag_head = 0;
954 }
955
956 /* Returns a pointer to the lagged case from N_BEFORE cases before the
957    current one, or NULL if there haven't been that many cases yet. */
958 struct ccase *
959 lagged_case (int n_before)
960 {
961   assert (n_before <= n_lag);
962   if (n_before > lag_count)
963     return NULL;
964   
965   {
966     int index = lag_head - n_before;
967     if (index < 0)
968       index += n_lag;
969     return lag_queue[index];
970   }
971 }
972    
973 /* Transforms temp_case and writes it to the replacement active file
974    if advisable.  Returns nonzero if more cases can be accepted, zero
975    otherwise.  Do not call this function again after it has returned
976    zero once.  */
977 int
978 procedure_write_case (write_case_data wc_data)
979 {
980   /* Index of current transformation. */
981   int cur_trns;
982
983   /* Return value: whether it's reasonable to write any more cases. */
984   int more_cases = 1;
985
986   debug_printf ((_("transform: ")));
987
988   cur_trns = f_trns;
989   for (;;)
990     {
991       /* Output the case if this is temp_trns. */
992       if (cur_trns == temp_trns)
993         {
994           debug_printf (("REC"));
995
996           if (n_lag)
997             lag_case ();
998           
999           vfm_sink_info.ncases++;
1000           vfm_sink->write ();
1001
1002           if (dict_get_case_limit (default_dict))
1003             more_cases = (vfm_sink_info.ncases
1004                           < dict_get_case_limit (default_dict));
1005         }
1006
1007       /* Are we done? */
1008       if (cur_trns >= n_trns)
1009         break;
1010       
1011       debug_printf (("$%d", cur_trns));
1012
1013       /* Decide which transformation should come next. */
1014       {
1015         int code;
1016         
1017         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
1018         switch (code)
1019           {
1020           case -1:
1021             /* Next transformation. */
1022             cur_trns++;
1023             break;
1024           case -2:
1025             /* Delete this case. */
1026             goto done;
1027           default:
1028             /* Go to that transformation. */
1029             cur_trns = code;
1030             break;
1031           }
1032       }
1033     }
1034
1035   /* Call the beginning of group function. */
1036   if (!case_count && wc_data->beginfunc != NULL)
1037     wc_data->beginfunc (wc_data->aux);
1038
1039   /* Call the procedure if there is one and FILTER and PROCESS IF
1040      don't prohibit it. */
1041   if (wc_data->procfunc != NULL
1042       && !FILTERED
1043       && (process_if_expr == NULL ||
1044           expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
1045     wc_data->procfunc (temp_case, wc_data->aux);
1046
1047   case_count++;
1048   
1049 done:
1050   debug_putc ('\n', stdout);
1051   
1052   {
1053     long *lp;
1054
1055     /* This case is finished.  Initialize the variables for the next case. */
1056     for (lp = reinit_sysmis.vec; *lp != -1;)
1057       temp_case->data[*lp++].f = SYSMIS;
1058     for (lp = reinit_blanks.vec; *lp != -1;)
1059       memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
1060   }
1061   
1062   /* Return previously determined value. */
1063   return more_cases;
1064 }
1065
1066 /* Appends TRNS to t_trns[], the list of all transformations to be
1067    performed on data as it is read from the active file. */
1068 void
1069 add_transformation (struct trns_header * trns)
1070 {
1071   if (n_trns >= m_trns)
1072     {
1073       m_trns += 16;
1074       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1075     }
1076   t_trns[n_trns] = trns;
1077   trns->index = n_trns++;
1078 }
1079
1080 /* Cancels all active transformations, including any transformations
1081    created by the input program. */
1082 void
1083 cancel_transformations (void)
1084 {
1085   int i;
1086   for (i = 0; i < n_trns; i++)
1087     {
1088       if (t_trns[i]->free)
1089         t_trns[i]->free (t_trns[i]);
1090       free (t_trns[i]);
1091     }
1092   n_trns = f_trns = 0;
1093   if (m_trns > 32)
1094     {
1095       free (t_trns);
1096       m_trns = 0;
1097     }
1098 }
1099
1100 /* Dumps out the values of all the split variables for the case C. */
1101 static void
1102 dump_splits (struct ccase *c)
1103 {
1104   struct variable *const *split;
1105   struct tab_table *t;
1106   size_t split_cnt;
1107   int i;
1108
1109   split_cnt = dict_get_split_cnt (default_dict);
1110   t = tab_create (3, split_cnt + 1, 0);
1111   tab_dim (t, tab_natural_dimensions);
1112   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1113   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1114   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1115   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1116   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1117   split = dict_get_split_vars (default_dict);
1118   for (i = 0; i < split_cnt; i++)
1119     {
1120       struct variable *v = split[i];
1121       char temp_buf[80];
1122       const char *val_lab;
1123
1124       assert (v->type == NUMERIC || v->type == ALPHA);
1125       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1126       
1127       {
1128         union value val = c->data[v->fv];
1129         if (v->type == ALPHA)
1130           val.c = c->data[v->fv].s;
1131         data_out (temp_buf, &v->print, &val);
1132       }
1133       
1134       temp_buf[v->print.w] = 0;
1135       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1136
1137       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1138       if (val_lab)
1139         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1140     }
1141   tab_flags (t, SOMF_NO_TITLE);
1142   tab_submit (t);
1143 }
1144
1145 /* This procfunc is substituted for the user-supplied procfunc when
1146    SPLIT FILE is active.  This function forms a wrapper around that
1147    procfunc by dividing the input into series. */
1148 static int
1149 SPLIT_FILE_procfunc (struct ccase *c, void *data_)
1150 {
1151   struct write_case_data *data = data_;
1152   static struct ccase *prev_case;
1153   struct variable *const *split;
1154   size_t split_cnt;
1155   size_t i;
1156
1157   /* The first case always begins a new series.  We also need to
1158      preserve the values of the case for later comparison. */
1159   if (case_count == 0)
1160     {
1161       if (prev_case)
1162         free (prev_case);
1163       prev_case = xmalloc (vfm_sink_info.case_size);
1164       memcpy (prev_case, c, vfm_sink_info.case_size);
1165
1166       dump_splits (c);
1167       if (data->beginfunc != NULL)
1168         data->beginfunc (data->aux);
1169       
1170       return data->procfunc (c, data->aux);
1171     }
1172
1173   /* Compare the value of each SPLIT FILE variable to the values on
1174      the previous case. */
1175   split = dict_get_split_vars (default_dict);
1176   split_cnt = dict_get_split_cnt (default_dict);
1177   for (i = 0; i < split_cnt; i++)
1178     {
1179       struct variable *v = split[i];
1180       
1181       switch (v->type)
1182         {
1183         case NUMERIC:
1184           if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
1185             goto not_equal;
1186           break;
1187         case ALPHA:
1188           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1189             goto not_equal;
1190           break;
1191         default:
1192           assert (0);
1193         }
1194     }
1195   return data->procfunc (c, data->aux);
1196   
1197 not_equal:
1198   /* The values of the SPLIT FILE variable are different from the
1199      values on the previous case.  That means that it's time to begin
1200      a new series. */
1201   if (data->endfunc != NULL)
1202     data->endfunc (data->aux);
1203   dump_splits (c);
1204   if (data->beginfunc != NULL)
1205     data->beginfunc (data->aux);
1206   memcpy (prev_case, c, vfm_sink_info.case_size);
1207   return data->procfunc (c, data->aux);
1208 }
1209 \f
1210 /* Case compaction. */
1211
1212 /* Copies case SRC to case DEST, compacting it in the process. */
1213 void
1214 compact_case (struct ccase *dest, const struct ccase *src)
1215 {
1216   int i;
1217   int nval = 0;
1218   size_t var_cnt;
1219   
1220   assert (compaction_necessary);
1221
1222   if (temporary == 2)
1223     {
1224       if (dest != compaction_case)
1225         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1226       return;
1227     }
1228
1229   /* Copy all the variables except the scratch variables from SRC to
1230      DEST. */
1231   var_cnt = dict_get_var_cnt (default_dict);
1232   for (i = 0; i < var_cnt; i++)
1233     {
1234       struct variable *v = dict_get_var (default_dict, i);
1235       
1236       if (v->name[0] == '#')
1237         continue;
1238
1239       if (v->type == NUMERIC)
1240         dest->data[nval++] = src->data[v->fv];
1241       else
1242         {
1243           int w = DIV_RND_UP (v->width, sizeof (union value));
1244           
1245           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1246           nval += w;
1247         }
1248     }
1249 }
1250
1251 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1252 static void
1253 finish_compaction (void)
1254 {
1255   int i;
1256
1257   for (i = 0; i < dict_get_var_cnt (default_dict); )
1258     {
1259       struct variable *v = dict_get_var (default_dict, i);
1260
1261       if (v->name[0] == '#') 
1262         dict_delete_var (default_dict, v);
1263       else
1264         i++;
1265     }
1266   dict_compact_values (default_dict);
1267 }
1268
1269