Beginning of VFM cleanup.
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "do-ifP.h"
32 #include "error.h"
33 #include "expr.h"
34 #include "misc.h"
35 #include "random.h"
36 #include "settings.h"
37 #include "som.h"
38 #include "str.h"
39 #include "tab.h"
40 #include "var.h"
41 #include "value-labels.h"
42
43 /*
44    Virtual File Manager (vfm):
45
46    vfm is used to process data files.  It uses the model that
47    data is read from one stream (the data source), processed,
48    then written to another (the data sink).  The data source is
49    then deleted and the data sink becomes the data source for the
50    next procedure. */
51
52 #include "debug-print.h"
53
54 /* Procedure execution data. */
55 struct write_case_data
56   {
57     void (*beginfunc) (void *);
58     int (*procfunc) (struct ccase *, void *);
59     void (*endfunc) (void *);
60     void *aux;
61   };
62
63 /* The current active file, from which cases are read. */
64 struct case_source *vfm_source;
65
66 /* The replacement active file, to which cases are written. */
67 struct case_sink *vfm_sink;
68
69 /* Information about the data source. */
70 struct stream_info vfm_source_info;
71
72 /* Information about the data sink. */
73 struct stream_info vfm_sink_info;
74
75 /* Nonzero if the case needs to have values deleted before being
76    stored, zero otherwise. */
77 int compaction_necessary;
78
79 /* Number of values after compaction, or the same as
80    vfm_sink_info.nval, if compaction is not necessary. */
81 int compaction_nval;
82
83 /* Temporary case buffer with enough room for `compaction_nval'
84    `value's. */
85 struct ccase *compaction_case;
86
87 /* Nonzero means that we've overflowed our allotted workspace.
88    After that happens once during a session, we always store the
89    active file on disk instead of in memory.  (This policy may be
90    too aggressive.) */
91 static int workspace_overflow = 0;
92
93 /* Time at which vfm was last invoked. */
94 time_t last_vfm_invocation;
95
96 /* Number of cases passed to proc_func(). */
97 static int case_count;
98
99 /* Lag queue. */
100 int n_lag;                      /* Number of cases to lag. */
101 static int lag_count;           /* Number of cases in lag_queue so far. */
102 static int lag_head;            /* Index where next case will be added. */
103 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
104
105 static void open_active_file (void);
106 static void close_active_file (struct write_case_data *);
107 static int SPLIT_FILE_procfunc (struct ccase *, void *);
108 static void finish_compaction (void);
109 static void lag_case (void);
110 static int procedure_write_case (struct write_case_data *);
111 static void clear_temp_case (void);
112 static int exclude_this_case (void);
113 \f
114 /* Public functions. */
115
116 /* Reads all the cases from the active file, transforms them by
117    the active set of transformations, calls PROCFUNC with CURCASE
118    set to the case, and writes them to a new active file.
119
120    Divides the active file into zero or more series of one or more
121    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
122    called after each series.
123
124    Arbitrary user-specified data AUX is passed to BEGINFUNC,
125    PROCFUNC, and ENDFUNC as auxiliary data. */
126 void
127 procedure (void (*beginfunc) (void *),
128            int (*procfunc) (struct ccase *curcase, void *),
129            void (*endfunc) (void *),
130            void *aux)
131 {
132   static int recursive_call;
133
134   struct write_case_data procedure_write_data;
135   struct write_case_data split_file_data;
136
137   assert (++recursive_call == 1);
138
139   if (dict_get_split_cnt (default_dict) == 0) 
140     {
141       /* Normally we just use the data passed by the user. */
142       procedure_write_data.beginfunc = beginfunc;
143       procedure_write_data.procfunc = procfunc;
144       procedure_write_data.endfunc = endfunc;
145       procedure_write_data.aux = aux;
146     }
147   else
148     {
149       /* Under SPLIT FILE, we add a layer of indirection. */
150       procedure_write_data.beginfunc = NULL;
151       procedure_write_data.procfunc = SPLIT_FILE_procfunc;
152       procedure_write_data.endfunc = endfunc;
153       procedure_write_data.aux = &split_file_data;
154
155       split_file_data.beginfunc = beginfunc;
156       split_file_data.procfunc = procfunc;
157       split_file_data.endfunc = endfunc;
158       split_file_data.aux = aux;
159     }
160
161   last_vfm_invocation = time (NULL);
162
163   open_active_file ();
164   if (vfm_source != NULL) 
165     vfm_source->class->read (vfm_source,
166                              procedure_write_case, &procedure_write_data);
167   close_active_file (&procedure_write_data);
168
169   assert (--recursive_call == 0);
170 }
171 \f
172 /* Active file processing support.  Subtly different semantics from
173    procedure(). */
174
175 static int process_active_file_write_case (struct write_case_data *data);
176
177 /* The casefunc might want us to stop calling it. */
178 static int not_canceled;
179
180 /* Reads all the cases from the active file and passes them one-by-one
181    to CASEFUNC in temp_case.  Before any cases are passed, calls
182    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
183    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
184    file by calling process_active_file_output_case().
185
186    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
187 void
188 process_active_file (void (*beginfunc) (void *),
189                      int (*casefunc) (struct ccase *curcase, void *),
190                      void (*endfunc) (void *),
191                      void *aux)
192 {
193   struct write_case_data process_active_write_data;
194
195   process_active_write_data.beginfunc = beginfunc;
196   process_active_write_data.procfunc = casefunc;
197   process_active_write_data.endfunc = endfunc;
198   process_active_write_data.aux = aux;
199
200   not_canceled = 1;
201
202   open_active_file ();
203   beginfunc (aux);
204   
205   /* There doesn't necessarily need to be an active file. */
206   if (vfm_source != NULL)
207     vfm_source->class->read (vfm_source, process_active_file_write_case,
208                              &process_active_write_data);
209   
210   endfunc (aux);
211   close_active_file (&process_active_write_data);
212 }
213
214 /* Pass the current case to casefunc. */
215 static int
216 process_active_file_write_case (struct write_case_data *data)
217 {
218   /* Index of current transformation. */
219   int cur_trns;
220
221   for (cur_trns = f_trns ; cur_trns != temp_trns; )
222     {
223       int code;
224         
225       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
226       switch (code)
227         {
228         case -1:
229           /* Next transformation. */
230           cur_trns++;
231           break;
232         case -2:
233           /* Delete this case. */
234           goto done;
235         default:
236           /* Go to that transformation. */
237           cur_trns = code;
238           break;
239         }
240     }
241
242   if (n_lag)
243     lag_case ();
244           
245   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
246   if (not_canceled && !exclude_this_case ())
247     not_canceled = data->procfunc (temp_case, data->aux);
248   
249   case_count++;
250   
251  done:
252   clear_temp_case ();
253
254   return 1;
255 }
256
257 /* Write temp_case to the active file. */
258 void
259 process_active_file_output_case (void)
260 {
261   vfm_sink_info.ncases++;
262   vfm_sink->class->write (vfm_sink, temp_case);
263 }
264 \f
265 /* Opening the active file. */
266
267 /* It might be usefully noted that the following several functions are
268    given in the order that they are called by open_active_file(). */
269
270 /* Prepare to write to the replacement active file. */
271 static void
272 prepare_for_writing (void)
273 {
274   /* FIXME: If ALL the conditions listed below hold true, then the
275      replacement active file is guaranteed to be identical to the
276      original active file:
277
278      1. TEMPORARY was the first transformation, OR, there were no
279      transformations at all.
280
281      2. Input is not coming from an input program.
282
283      3. Compaction is not necessary.
284
285      So, in this case, we shouldn't have to replace the active
286      file--it's just a waste of time and space. */
287
288   vfm_sink_info.ncases = 0;
289   vfm_sink_info.nval = dict_get_next_value_idx (default_dict);
290   vfm_sink_info.case_size = dict_get_case_size (default_dict);
291   
292   if (vfm_sink == NULL)
293     {
294       if (vfm_sink_info.case_size * vfm_source_info.ncases > set_max_workspace
295           && !workspace_overflow)
296         {
297           msg (MW, _("Workspace overflow predicted.  Max workspace is "
298                      "currently set to %d KB (%d cases at %d bytes each).  "
299                      "Writing active file to disk."),
300                set_max_workspace / 1024, set_max_workspace / vfm_sink_info.case_size,
301                vfm_sink_info.case_size);
302           
303           workspace_overflow = 1;
304         }
305
306       if (workspace_overflow)
307         vfm_sink = create_case_sink (&disk_sink_class, NULL);
308       else
309         vfm_sink = create_case_sink (&memory_sink_class, NULL);
310     }
311 }
312
313 /* Arrange for compacting the output cases for storage. */
314 static void
315 arrange_compaction (void)
316 {
317   int count_values = 0;
318
319   {
320     int i;
321     
322     /* Count up the number of `value's that will be output. */
323     for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
324       {
325         struct variable *v = dict_get_var (temp_dict, i);
326
327         if (v->name[0] != '#')
328           {
329             assert (v->nv > 0);
330             count_values += v->nv;
331           } 
332       }
333     assert (temporary == 2
334             || count_values <= dict_get_next_value_idx (temp_dict));
335   }
336   
337   /* Compaction is only necessary if the number of `value's to output
338      differs from the number already present. */
339   compaction_nval = count_values;
340   if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
341     compaction_necessary = 1;
342   else
343     compaction_necessary = 0;
344   
345   if (vfm_sink->class->open != NULL)
346     vfm_sink->class->open (vfm_sink);
347 }
348
349 /* Prepares the temporary case and compaction case. */
350 static void
351 make_temp_case (void)
352 {
353   temp_case = xmalloc (vfm_sink_info.case_size);
354
355   if (compaction_necessary)
356     compaction_case = xmalloc (sizeof (struct ccase)
357                                + sizeof (union value) * (compaction_nval - 1));
358 }
359
360 #if DEBUGGING
361 /* Returns the name of the variable that owns the index CCASE_INDEX
362    into ccase. */
363 static const char *
364 index_to_varname (int ccase_index)
365 {
366   int i;
367
368   for (i = 0; i < default_dict.nvar; i++)
369     {
370       struct variable *v = default_dict.var[i];
371       
372       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
373         return default_dict.var[i]->name;
374     }
375   return _("<NOVAR>");
376 }
377 #endif
378
379 /* Initializes temp_case from the vectors that say which `value's
380    need to be initialized just once, and which ones need to be
381    re-initialized before every case. */
382 static void
383 vector_initialization (void)
384 {
385   size_t var_cnt = dict_get_var_cnt (default_dict);
386   size_t i;
387   
388   for (i = 0; i < var_cnt; i++) 
389     {
390       struct variable *v = dict_get_var (default_dict, i);
391
392       if (v->type == NUMERIC) 
393         {
394           if (v->reinit)
395             temp_case->data[v->fv].f = 0.0;
396           else
397             temp_case->data[v->fv].f = SYSMIS;
398         }
399       else
400         memset (temp_case->data[v->fv].s, ' ', v->width);
401     }
402 }
403
404 /* Sets all the lag-related variables based on value of n_lag. */
405 static void
406 setup_lag (void)
407 {
408   int i;
409   
410   if (n_lag == 0)
411     return;
412
413   lag_count = 0;
414   lag_head = 0;
415   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
416   for (i = 0; i < n_lag; i++)
417     lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
418 }
419
420 /* There is a lot of potential confusion in the vfm and related
421    routines over the number of `value's at each stage of the process.
422    Here is each nval count, with explanation, as set up by
423    open_active_file():
424
425    vfm_source_info.nval: Number of `value's in the cases returned by
426    the source stream.  This value turns out not to be very useful, but
427    we maintain it anyway.
428
429    vfm_sink_info.nval: Number of `value's in the cases after all
430    transformations have been performed.  Never less than
431    vfm_source_info.nval.
432
433    temp_dict->nval: Number of `value's in the cases after the
434    transformations leading up to TEMPORARY have been performed.  If
435    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
436    Never less than vfm_sink_info.nval.
437
438    compaction_nval: Number of `value's in the cases after the
439    transformations leading up to TEMPORARY have been performed and the
440    case has been compacted by compact_case(), if compaction is
441    necessary.  This the number of `value's in the cases saved by the
442    sink stream.  (However, note that the cases passed to the sink
443    stream have not yet been compacted.  It is the responsibility of
444    the data sink to call compact_case().)  This may be less than,
445    greater than, or equal to vfm_source_info.nval.  `compaction'
446    becomes the new value of default_dict.nval after the procedure is
447    completed.
448
449    default_dict.nval: This is often an alias for temp_dict->nval.  As
450    such it can really have no separate existence until the procedure
451    is complete.  For this reason it should *not* be referenced inside
452    the execution of a procedure. */
453 /* Makes all preparations for reading from the data source and writing
454    to the data sink. */
455 static void
456 open_active_file (void)
457 {
458   /* Sometimes we want to refer to the dictionary that applies to the
459      data actually written to the sink.  This is either temp_dict or
460      default_dict.  However, if TEMPORARY is not on, then temp_dict
461      does not apply.  So, we can set temp_dict to default_dict in this
462      case. */
463   if (!temporary)
464     {
465       temp_trns = n_trns;
466       temp_dict = default_dict;
467     }
468
469   /* No cases passed to the procedure yet. */
470   case_count = 0;
471
472   /* The rest. */
473   prepare_for_writing ();
474   arrange_compaction ();
475   make_temp_case ();
476   vector_initialization ();
477   discard_ctl_stack ();
478   setup_lag ();
479
480   /* Debug output. */
481   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
482                  vfm_source->name, vfm_sink->name));
483   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
484                  "temp_dict->nval=%d, compaction_nval=%d, "
485                  "default_dict.nval=%d\n",
486                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
487                  compaction_nval, default_dict.nval));
488 }
489 \f
490 /* Closes the active file. */
491 static void
492 close_active_file (struct write_case_data *data)
493 {
494   /* Close the current case group. */
495   if (case_count && data->endfunc != NULL)
496     data->endfunc (data->aux);
497
498   /* Stop lagging (catch up?). */
499   if (n_lag)
500     {
501       int i;
502       
503       for (i = 0; i < n_lag; i++)
504         free (lag_queue[i]);
505       free (lag_queue);
506       n_lag = 0;
507     }
508   
509   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
510      off TEMPORARY. */
511   if (temporary)
512     {
513       dict_destroy (default_dict);
514       default_dict = temp_dict;
515       temp_dict = NULL;
516     }
517
518   /* Finish compaction. */
519   if (compaction_necessary)
520     finish_compaction ();
521     
522   /* Old data sink --> New data source. */
523   if (vfm_source != NULL) 
524     {
525       if (vfm_source->class->destroy != NULL)
526         vfm_source->class->destroy (vfm_source);
527       free (vfm_source);
528     }
529
530   vfm_source = vfm_sink->class->make_source (vfm_sink);
531   vfm_source_info.ncases = vfm_sink_info.ncases;
532   vfm_source_info.nval = compaction_nval;
533   vfm_source_info.case_size = (sizeof (struct ccase)
534                                + (compaction_nval - 1) * sizeof (union value));
535
536   /* Old data sink is gone now. */
537   free (vfm_sink);
538   vfm_sink = NULL;
539
540   /* Cancel TEMPORARY. */
541   cancel_temporary ();
542
543   /* Free temporary cases. */
544   free (temp_case);
545   temp_case = NULL;
546
547   free (compaction_case);
548   compaction_case = NULL;
549
550   /* Cancel PROCESS IF. */
551   expr_free (process_if_expr);
552   process_if_expr = NULL;
553
554   /* Cancel FILTER if temporary. */
555   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
556     dict_set_filter (default_dict, NULL);
557
558   /* Cancel transformations. */
559   cancel_transformations ();
560
561   /* Turn off case limiter. */
562   dict_set_case_limit (default_dict, 0);
563
564   /* Clear VECTOR vectors. */
565   dict_clear_vectors (default_dict);
566
567   debug_printf (("vfm: procedure complete\n\n"));
568 }
569 \f
570 /* Disk case stream. */
571
572 /* Initializes the disk sink. */
573 static void
574 disk_sink_create (struct case_sink *sink)
575 {
576   sink->aux = tmpfile ();
577   if (!sink->aux)
578     {
579       msg (ME, _("An error occurred attempting to create a temporary "
580                  "file for use as the active file: %s."),
581            strerror (errno));
582       err_failure ();
583     }
584 }
585
586 /* Writes temp_case to the disk sink. */
587 static void
588 disk_sink_write (struct case_sink *sink, struct ccase *c)
589 {
590   FILE *file = sink->aux;
591   union value *src_case;
592
593   if (compaction_necessary)
594     {
595       compact_case (compaction_case, c);
596       src_case = compaction_case->data;
597     }
598   else src_case = c->data;
599
600   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, file) != 1)
601     {
602       msg (ME, _("An error occurred while attempting to write to a "
603                  "temporary file used as the active file: %s."),
604            strerror (errno));
605       err_failure ();
606     }
607 }
608
609 /* Destroys the sink's internal data. */
610 static void
611 disk_sink_destroy (struct case_sink *sink)
612 {
613   FILE *file = sink->aux;
614   if (file != NULL)
615     fclose (file);
616 }
617
618 /* Closes and destroys the sink and returns a disk source to read
619    back the written data. */
620 static struct case_source *
621 disk_sink_make_source (struct case_sink *sink) 
622 {
623   FILE *file = sink->aux;
624   
625   /* Rewind the file. */
626   assert (file != NULL);
627   if (fseek (file, 0, SEEK_SET) != 0)
628     {
629       msg (ME, _("An error occurred while attempting to rewind a "
630                  "temporary file used as the active file: %s."),
631            strerror (errno));
632       err_failure ();
633     }
634   
635   return create_case_source (&disk_source_class, file);
636 }
637
638 /* Disk sink. */
639 const struct case_sink_class disk_sink_class = 
640   {
641     "disk",
642     disk_sink_create,
643     disk_sink_write,
644     disk_sink_destroy,
645     disk_sink_make_source,
646   };
647 \f
648 /* Disk source. */
649
650 /* Reads all cases from the disk source and passes them one by one to
651    write_case(). */
652 static void
653 disk_source_read (struct case_source *source,
654                   write_case_func *write_case, write_case_data wc_data)
655 {
656   FILE *file = source->aux;
657   int i;
658
659   for (i = 0; i < vfm_source_info.ncases; i++)
660     {
661       if (!fread (temp_case, vfm_source_info.case_size, 1, file))
662         {
663           msg (ME, _("An error occurred while attempting to read from "
664                "a temporary file created for the active file: %s."),
665                strerror (errno));
666           err_failure ();
667           return;
668         }
669
670       if (!write_case (wc_data))
671         return;
672     }
673 }
674
675 /* Destroys the source's internal data. */
676 static void
677 disk_source_destroy (struct case_source *source)
678 {
679   FILE *file = source->aux;
680   if (file != NULL)
681     fclose (file);
682 }
683
684 /* Disk source. */
685 const struct case_source_class disk_source_class = 
686   {
687     "disk",
688     disk_source_read,
689     disk_source_destroy,
690   };
691 \f
692 /* Memory case stream. */
693
694 /* Memory sink data. */
695 struct memory_sink_info
696   {
697     int max_cases;              /* Maximum cases before switching to disk. */
698     struct case_list *head;     /* First case in list. */
699     struct case_list *tail;     /* Last case in list. */
700   };
701
702 /* Memory source data. */
703 struct memory_source_info 
704   {
705     struct case_list *cases;    /* List of cases. */
706   };
707
708 static void
709 memory_sink_create (struct case_sink *sink) 
710 {
711   struct memory_sink_info *info;
712   
713   sink->aux = info = xmalloc (sizeof *info);
714
715   assert (compaction_nval > 0);
716   info->max_cases = set_max_workspace / (sizeof (union value) * compaction_nval);
717   info->head = info->tail = NULL;
718 }
719
720 static void
721 memory_sink_write (struct case_sink *sink, struct ccase *c) 
722 {
723   struct memory_sink_info *info = sink->aux;
724   size_t case_size;
725   struct case_list *new_case;
726
727   case_size = sizeof (struct case_list)
728                       + ((compaction_nval - 1) * sizeof (union value));
729   new_case = malloc (case_size);
730
731   /* If we've got memory to spare then add it to the linked list. */
732   if (vfm_sink_info.ncases <= info->max_cases && new_case != NULL)
733     {
734       /* Append case to linked list. */
735       new_case->next = NULL;
736       if (info->head != NULL)
737         info->tail->next = new_case;
738       else
739         info->head = new_case;
740       info->tail = new_case;
741
742       /* Copy data into case. */
743       if (compaction_necessary)
744         compact_case (&new_case->c, c);
745       else
746         memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
747     }
748   else
749     {
750       /* Out of memory.  Write the active file to disk. */
751       struct case_list *cur, *next;
752
753       /* Notify the user. */
754       if (!new_case)
755         msg (MW, _("Virtual memory exhausted.  Writing active file "
756                    "to disk."));
757       else
758         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
759                    "overflowed.  Writing active file to disk."),
760              set_max_workspace / 1024, info->max_cases,
761              compaction_nval * sizeof (union value));
762
763       free (new_case);
764
765       /* Switch to a disk sink. */
766       vfm_sink = create_case_sink (&disk_sink_class, NULL);
767       vfm_sink->class->open (vfm_sink);
768       workspace_overflow = 1;
769
770       /* Write the cases to disk and destroy them.  We can't call
771          vfm->sink->write() because of compaction. */
772       for (cur = info->head; cur; cur = next)
773         {
774           next = cur->next;
775           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
776                       vfm_sink->aux) != 1)
777             {
778               msg (ME, _("An error occurred while attempting to "
779                          "write to a temporary file created as the "
780                          "active file: %s."),
781                    strerror (errno));
782               err_failure ();
783             }
784           free (cur);
785         }
786
787       /* Write the current case to disk. */
788       vfm_sink->class->write (vfm_sink, c);
789     }
790 }
791
792 /* If the data is stored in memory, causes it to be written to disk.
793    To be called only *between* procedure()s, not within them. */
794 void
795 write_active_file_to_disk (void)
796 {
797   if (case_source_is_class (vfm_source, &memory_source_class))
798     {
799       struct memory_source_info *info = vfm_source->aux;
800
801       /* Switch to a disk sink. */
802       vfm_sink = create_case_sink (&disk_sink_class, NULL);
803       vfm_sink->class->open (vfm_sink);
804       workspace_overflow = 1;
805       
806       /* Write the cases to disk and destroy them.  We can't call
807          vfm->sink->write() because of compaction. */
808       {
809         struct case_list *cur, *next;
810         
811         for (cur = info->cases; cur; cur = next)
812           {
813             next = cur->next;
814             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
815                         vfm_sink->aux) != 1)
816               {
817                 msg (ME, _("An error occurred while attempting to "
818                            "write to a temporary file created as the "
819                            "active file: %s."),
820                      strerror (errno));
821                 err_failure ();
822               }
823             free (cur);
824           }
825       }
826       
827       vfm_source = vfm_sink->class->make_source (vfm_sink);
828       vfm_sink = NULL;
829     }
830 }
831
832 /* Destroy all memory sink data. */
833 static void
834 memory_sink_destroy (struct case_sink *sink)
835 {
836   struct memory_sink_info *info = sink->aux;
837   struct case_list *cur, *next;
838   
839   for (cur = info->head; cur; cur = next)
840     {
841       next = cur->next;
842       free (cur);
843     }
844   free (info);
845 }
846
847 /* Switch the memory stream from sink to source mode. */
848 static struct case_source *
849 memory_sink_make_source (struct case_sink *sink)
850 {
851   struct memory_sink_info *sink_info = sink->aux;
852   struct memory_source_info *source_info;
853
854   source_info = xmalloc (sizeof *source_info);
855   source_info->cases = sink_info->head;
856
857   free (sink_info);
858
859   return create_case_source (&memory_source_class, source_info);
860 }
861
862 const struct case_sink_class memory_sink_class = 
863   {
864     "memory",
865     memory_sink_create,
866     memory_sink_write,
867     memory_sink_destroy,
868     memory_sink_make_source,
869   };
870
871 /* Reads the case stream from memory and passes it to write_case(). */
872 static void
873 memory_source_read (struct case_source *source,
874                     write_case_func *write_case, write_case_data wc_data)
875 {
876   struct memory_source_info *info = source->aux;
877
878   while (info->cases != NULL) 
879     {
880       struct case_list *iter = info->cases;
881       info->cases = iter->next;
882       memcpy (temp_case, &iter->c, vfm_source_info.case_size);
883       free (iter);
884       
885       if (!write_case (wc_data))
886         return;
887     }
888 }
889
890 /* Destroy all memory source data. */
891 static void
892 memory_source_destroy (struct case_source *source)
893 {
894   struct memory_source_info *info = source->aux;
895   struct case_list *cur, *next;
896   
897   for (cur = info->cases; cur; cur = next)
898     {
899       next = cur->next;
900       free (cur);
901     }
902   free (info);
903 }
904
905 struct case_list *
906 memory_source_get_cases (const struct case_source *source) 
907 {
908   struct memory_source_info *info = source->aux;
909
910   return info->cases;
911 }
912
913 void
914 memory_source_set_cases (const struct case_source *source,
915                          struct case_list *cases) 
916 {
917   struct memory_source_info *info = source->aux;
918
919   info->cases = cases;
920 }
921
922 /* Memory stream. */
923 const struct case_source_class memory_source_class = 
924   {
925     "memory",
926     memory_source_read,
927     memory_source_destroy,
928   };
929 \f
930 #include "debug-print.h"
931
932 /* Add temp_case to the lag queue. */
933 static void
934 lag_case (void)
935 {
936   if (lag_count < n_lag)
937     lag_count++;
938   memcpy (lag_queue[lag_head], temp_case,
939           dict_get_case_size (temp_dict));
940   if (++lag_head >= n_lag)
941     lag_head = 0;
942 }
943
944 /* Returns a pointer to the lagged case from N_BEFORE cases before the
945    current one, or NULL if there haven't been that many cases yet. */
946 struct ccase *
947 lagged_case (int n_before)
948 {
949   assert (n_before <= n_lag);
950   if (n_before > lag_count)
951     return NULL;
952   
953   {
954     int index = lag_head - n_before;
955     if (index < 0)
956       index += n_lag;
957     return lag_queue[index];
958   }
959 }
960    
961 /* Transforms temp_case and writes it to the replacement active file
962    if advisable.  Returns nonzero if more cases can be accepted, zero
963    otherwise.  Do not call this function again after it has returned
964    zero once.  */
965 int
966 procedure_write_case (write_case_data wc_data)
967 {
968   /* Index of current transformation. */
969   int cur_trns;
970
971   /* Return value: whether it's reasonable to write any more cases. */
972   int more_cases = 1;
973
974   debug_printf ((_("transform: ")));
975
976   cur_trns = f_trns;
977   for (;;)
978     {
979       /* Output the case if this is temp_trns. */
980       if (cur_trns == temp_trns)
981         {
982           debug_printf (("REC"));
983
984           if (n_lag)
985             lag_case ();
986           
987           vfm_sink_info.ncases++;
988           vfm_sink->class->write (vfm_sink, temp_case);
989
990           if (dict_get_case_limit (default_dict))
991             more_cases = (vfm_sink_info.ncases
992                           < dict_get_case_limit (default_dict));
993         }
994
995       /* Are we done? */
996       if (cur_trns >= n_trns)
997         break;
998       
999       debug_printf (("$%d", cur_trns));
1000
1001       /* Decide which transformation should come next. */
1002       {
1003         int code;
1004         
1005         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
1006         switch (code)
1007           {
1008           case -1:
1009             /* Next transformation. */
1010             cur_trns++;
1011             break;
1012           case -2:
1013             /* Delete this case. */
1014             goto done;
1015           default:
1016             /* Go to that transformation. */
1017             cur_trns = code;
1018             break;
1019           }
1020       }
1021     }
1022
1023   /* Call the beginning of group function. */
1024   if (!case_count && wc_data->beginfunc != NULL)
1025     wc_data->beginfunc (wc_data->aux);
1026
1027   /* Call the procedure if there is one and FILTER and PROCESS IF
1028      don't prohibit it. */
1029   if (wc_data->procfunc != NULL && !exclude_this_case ())
1030     wc_data->procfunc (temp_case, wc_data->aux);
1031
1032   case_count++;
1033   
1034 done:
1035   debug_putc ('\n', stdout);
1036
1037   clear_temp_case ();
1038   
1039   /* Return previously determined value. */
1040   return more_cases;
1041 }
1042
1043 /* Clears the variables in the temporary case that need to be
1044    cleared between processing cases.  */
1045 static void
1046 clear_temp_case (void)
1047 {
1048   /* FIXME?  This is linear in the number of variables, but
1049      doesn't need to be, so it's an easy optimization target. */
1050   size_t var_cnt = dict_get_var_cnt (default_dict);
1051   size_t i;
1052   
1053   for (i = 0; i < var_cnt; i++) 
1054     {
1055       struct variable *v = dict_get_var (default_dict, i);
1056       if (v->init && v->reinit) 
1057         {
1058           if (v->type == NUMERIC) 
1059             temp_case->data[v->fv].f = SYSMIS;
1060           else
1061             memset (temp_case->data[v->fv].s, ' ', v->width);
1062         } 
1063     }
1064 }
1065
1066 /* Returns nonzero if this case should be exclude as specified on
1067    FILTER or PROCESS IF, otherwise zero. */
1068 static int
1069 exclude_this_case (void)
1070 {
1071   /* FILTER. */
1072   struct variable *filter_var = dict_get_filter (default_dict);
1073   if (filter_var != NULL) 
1074     {
1075       double f = temp_case->data[filter_var->fv].f;
1076       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
1077         return 1;
1078     }
1079
1080   /* PROCESS IF. */
1081   if (process_if_expr != NULL
1082       && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0)
1083     return 1;
1084
1085   return 0;
1086 }
1087
1088 /* Appends TRNS to t_trns[], the list of all transformations to be
1089    performed on data as it is read from the active file. */
1090 void
1091 add_transformation (struct trns_header * trns)
1092 {
1093   if (n_trns >= m_trns)
1094     {
1095       m_trns += 16;
1096       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1097     }
1098   t_trns[n_trns] = trns;
1099   trns->index = n_trns++;
1100 }
1101
1102 /* Cancels all active transformations, including any transformations
1103    created by the input program. */
1104 void
1105 cancel_transformations (void)
1106 {
1107   int i;
1108   for (i = 0; i < n_trns; i++)
1109     {
1110       if (t_trns[i]->free)
1111         t_trns[i]->free (t_trns[i]);
1112       free (t_trns[i]);
1113     }
1114   n_trns = f_trns = 0;
1115   if (m_trns > 32)
1116     {
1117       free (t_trns);
1118       m_trns = 0;
1119     }
1120 }
1121
1122 /* Dumps out the values of all the split variables for the case C. */
1123 static void
1124 dump_splits (struct ccase *c)
1125 {
1126   struct variable *const *split;
1127   struct tab_table *t;
1128   size_t split_cnt;
1129   int i;
1130
1131   split_cnt = dict_get_split_cnt (default_dict);
1132   t = tab_create (3, split_cnt + 1, 0);
1133   tab_dim (t, tab_natural_dimensions);
1134   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1135   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1136   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1137   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1138   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1139   split = dict_get_split_vars (default_dict);
1140   for (i = 0; i < split_cnt; i++)
1141     {
1142       struct variable *v = split[i];
1143       char temp_buf[80];
1144       const char *val_lab;
1145
1146       assert (v->type == NUMERIC || v->type == ALPHA);
1147       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1148       
1149       data_out (temp_buf, &v->print, &c->data[v->fv]);
1150       
1151       temp_buf[v->print.w] = 0;
1152       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1153
1154       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1155       if (val_lab)
1156         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1157     }
1158   tab_flags (t, SOMF_NO_TITLE);
1159   tab_submit (t);
1160 }
1161
1162 /* This procfunc is substituted for the user-supplied procfunc when
1163    SPLIT FILE is active.  This function forms a wrapper around that
1164    procfunc by dividing the input into series. */
1165 static int
1166 SPLIT_FILE_procfunc (struct ccase *c, void *data_)
1167 {
1168   struct write_case_data *data = data_;
1169   static struct ccase *prev_case;
1170   struct variable *const *split;
1171   size_t split_cnt;
1172   size_t i;
1173
1174   /* The first case always begins a new series.  We also need to
1175      preserve the values of the case for later comparison. */
1176   if (case_count == 0)
1177     {
1178       if (prev_case)
1179         free (prev_case);
1180       prev_case = xmalloc (vfm_sink_info.case_size);
1181       memcpy (prev_case, c, vfm_sink_info.case_size);
1182
1183       dump_splits (c);
1184       if (data->beginfunc != NULL)
1185         data->beginfunc (data->aux);
1186       
1187       return data->procfunc (c, data->aux);
1188     }
1189
1190   /* Compare the value of each SPLIT FILE variable to the values on
1191      the previous case. */
1192   split = dict_get_split_vars (default_dict);
1193   split_cnt = dict_get_split_cnt (default_dict);
1194   for (i = 0; i < split_cnt; i++)
1195     {
1196       struct variable *v = split[i];
1197       
1198       switch (v->type)
1199         {
1200         case NUMERIC:
1201           if (c->data[v->fv].f != prev_case->data[v->fv].f)
1202             goto not_equal;
1203           break;
1204         case ALPHA:
1205           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1206             goto not_equal;
1207           break;
1208         default:
1209           assert (0);
1210         }
1211     }
1212   return data->procfunc (c, data->aux);
1213   
1214 not_equal:
1215   /* The values of the SPLIT FILE variable are different from the
1216      values on the previous case.  That means that it's time to begin
1217      a new series. */
1218   if (data->endfunc != NULL)
1219     data->endfunc (data->aux);
1220   dump_splits (c);
1221   if (data->beginfunc != NULL)
1222     data->beginfunc (data->aux);
1223   memcpy (prev_case, c, vfm_sink_info.case_size);
1224   return data->procfunc (c, data->aux);
1225 }
1226 \f
1227 /* Case compaction. */
1228
1229 /* Copies case SRC to case DEST, compacting it in the process. */
1230 void
1231 compact_case (struct ccase *dest, const struct ccase *src)
1232 {
1233   int i;
1234   int nval = 0;
1235   size_t var_cnt;
1236   
1237   assert (compaction_necessary);
1238
1239   if (temporary == 2)
1240     {
1241       if (dest != compaction_case)
1242         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1243       return;
1244     }
1245
1246   /* Copy all the variables except the scratch variables from SRC to
1247      DEST. */
1248   var_cnt = dict_get_var_cnt (default_dict);
1249   for (i = 0; i < var_cnt; i++)
1250     {
1251       struct variable *v = dict_get_var (default_dict, i);
1252       
1253       if (v->name[0] == '#')
1254         continue;
1255
1256       if (v->type == NUMERIC)
1257         dest->data[nval++] = src->data[v->fv];
1258       else
1259         {
1260           int w = DIV_RND_UP (v->width, sizeof (union value));
1261           
1262           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1263           nval += w;
1264         }
1265     }
1266 }
1267
1268 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1269 static void
1270 finish_compaction (void)
1271 {
1272   int i;
1273
1274   for (i = 0; i < dict_get_var_cnt (default_dict); )
1275     {
1276       struct variable *v = dict_get_var (default_dict, i);
1277
1278       if (v->name[0] == '#') 
1279         dict_delete_var (default_dict, v);
1280       else
1281         i++;
1282     }
1283   dict_compact_values (default_dict);
1284 }
1285
1286 struct case_source *
1287 create_case_source (const struct case_source_class *class, void *aux) 
1288 {
1289   struct case_source *source = xmalloc (sizeof *source);
1290   source->class = class;
1291   source->aux = aux;
1292   return source;
1293 }
1294
1295 int
1296 case_source_is_complex (const struct case_source *source) 
1297 {
1298   return source != NULL && (source->class == &input_program_source_class
1299                             || source->class == &file_type_source_class);
1300 }
1301
1302 int
1303 case_source_is_class (const struct case_source *source,
1304                       const struct case_source_class *class) 
1305 {
1306   return source != NULL && source->class == class;
1307
1308 }
1309
1310 struct case_sink *
1311 create_case_sink (const struct case_sink_class *class, void *aux) 
1312 {
1313   struct case_sink *sink = xmalloc (sizeof *sink);
1314   sink->class = class;
1315   sink->aux = aux;
1316   return sink;
1317 }
1318