Clean up struct dictionary's value_cnt usage.
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "approx.h"
32 #include "do-ifP.h"
33 #include "error.h"
34 #include "expr.h"
35 #include "misc.h"
36 #include "random.h"
37 #include "som.h"
38 #include "str.h"
39 #include "tab.h"
40 #include "var.h"
41 #include "value-labels.h"
42
43 /*
44    Virtual File Manager (vfm):
45
46    vfm is used to process data files.  It uses the model that data is
47    read from one stream (the data source), then written to another
48    (the data sink).  The data source is then deleted and the data sink
49    becomes the data source for the next procedure. */
50
51 #include "debug-print.h"
52
53 /* Procedure execution data. */
54 struct write_case_data
55   {
56     void (*beginfunc) (void *);
57     int (*procfunc) (struct ccase *, void *);
58     void (*endfunc) (void *);
59     void *aux;
60   };
61
62 /* This is used to read from the active file. */
63 struct case_stream *vfm_source;
64
65 /* This is used to write to the replacement active file. */
66 struct case_stream *vfm_sink;
67
68 /* Information about the data source. */
69 struct stream_info vfm_source_info;
70
71 /* Information about the data sink. */
72 struct stream_info vfm_sink_info;
73
74 /* Nonzero if the case needs to have values deleted before being
75    stored, zero otherwise. */
76 int compaction_necessary;
77
78 /* Number of values after compaction, or the same as
79    vfm_sink_info.nval, if compaction is not necessary. */
80 int compaction_nval;
81
82 /* Temporary case buffer with enough room for `compaction_nval'
83    `value's. */
84 struct ccase *compaction_case;
85
86 /* Within a session, when paging is turned on, it is never turned back
87    off.  This policy might be too aggressive. */
88 static int paging = 0;
89
90 /* Time at which vfm was last invoked. */
91 time_t last_vfm_invocation;
92
93 /* Number of cases passed to proc_func(). */
94 static int case_count;
95
96 /* Lag queue. */
97 int n_lag;                      /* Number of cases to lag. */
98 static int lag_count;           /* Number of cases in lag_queue so far. */
99 static int lag_head;            /* Index where next case will be added. */
100 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
101
102 static void open_active_file (void);
103 static void close_active_file (struct write_case_data *);
104 static int SPLIT_FILE_procfunc (struct ccase *, void *);
105 static void finish_compaction (void);
106 static void lag_case (void);
107 static int procedure_write_case (struct write_case_data *);
108 static void clear_temp_case (void);
109 static int exclude_this_case (void);
110 \f
111 /* Public functions. */
112
113 /* Reads all the cases from the active file, transforms them by
114    the active set of transformations, calls PROCFUNC with CURCASE
115    set to the case, and writes them to a new active file.
116
117    Divides the active file into zero or more series of one or more
118    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
119    called after each series.
120
121    Arbitrary user-specified data AUX is passed to BEGINFUNC,
122    PROCFUNC, and ENDFUNC as auxiliary data. */
123 void
124 procedure (void (*beginfunc) (void *),
125            int (*procfunc) (struct ccase *curcase, void *),
126            void (*endfunc) (void *),
127            void *aux)
128 {
129   struct write_case_data procedure_write_data;
130   struct write_case_data split_file_data;
131
132   if (dict_get_split_cnt (default_dict) == 0) 
133     {
134       /* Normally we just use the data passed by the user. */
135       procedure_write_data.beginfunc = beginfunc;
136       procedure_write_data.procfunc = procfunc;
137       procedure_write_data.endfunc = endfunc;
138       procedure_write_data.aux = aux;
139     }
140   else
141     {
142       /* Under SPLIT FILE, we add a layer of indirection. */
143       procedure_write_data.beginfunc = NULL;
144       procedure_write_data.procfunc = SPLIT_FILE_procfunc;
145       procedure_write_data.endfunc = endfunc;
146       procedure_write_data.aux = &split_file_data;
147
148       split_file_data.beginfunc = beginfunc;
149       split_file_data.procfunc = procfunc;
150       split_file_data.endfunc = endfunc;
151       split_file_data.aux = aux;
152     }
153
154   last_vfm_invocation = time (NULL);
155
156   open_active_file ();
157   vfm_source->read (procedure_write_case, &procedure_write_data);
158   close_active_file (&procedure_write_data);
159 }
160 \f
161 /* Active file processing support.  Subtly different semantics from
162    procedure(). */
163
164 static int process_active_file_write_case (struct write_case_data *data);
165
166 /* The casefunc might want us to stop calling it. */
167 static int not_canceled;
168
169 /* Reads all the cases from the active file and passes them one-by-one
170    to CASEFUNC in temp_case.  Before any cases are passed, calls
171    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
172    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
173    file by calling process_active_file_output_case().
174
175    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
176 void
177 process_active_file (void (*beginfunc) (void *),
178                      int (*casefunc) (struct ccase *curcase, void *),
179                      void (*endfunc) (void *),
180                      void *aux)
181 {
182   struct write_case_data process_active_write_data;
183
184   process_active_write_data.beginfunc = beginfunc;
185   process_active_write_data.procfunc = casefunc;
186   process_active_write_data.endfunc = endfunc;
187   process_active_write_data.aux = aux;
188
189   not_canceled = 1;
190
191   open_active_file ();
192   beginfunc (aux);
193   
194   /* There doesn't necessarily need to be an active file. */
195   if (vfm_source)
196     vfm_source->read (process_active_file_write_case,
197                       &process_active_write_data);
198   
199   endfunc (aux);
200   close_active_file (&process_active_write_data);
201 }
202
203 /* Pass the current case to casefunc. */
204 static int
205 process_active_file_write_case (struct write_case_data *data)
206 {
207   /* Index of current transformation. */
208   int cur_trns;
209
210   for (cur_trns = f_trns ; cur_trns != temp_trns; )
211     {
212       int code;
213         
214       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
215       switch (code)
216         {
217         case -1:
218           /* Next transformation. */
219           cur_trns++;
220           break;
221         case -2:
222           /* Delete this case. */
223           goto done;
224         default:
225           /* Go to that transformation. */
226           cur_trns = code;
227           break;
228         }
229     }
230
231   if (n_lag)
232     lag_case ();
233           
234   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
235   if (not_canceled && !exclude_this_case ())
236     not_canceled = data->procfunc (temp_case, data->aux);
237   
238   case_count++;
239   
240  done:
241   clear_temp_case ();
242
243   return 1;
244 }
245
246 /* Write temp_case to the active file. */
247 void
248 process_active_file_output_case (void)
249 {
250   vfm_sink_info.ncases++;
251   vfm_sink->write ();
252 }
253 \f
254 /* Opening the active file. */
255
256 /* It might be usefully noted that the following several functions are
257    given in the order that they are called by open_active_file(). */
258
259 /* Prepare to write to the replacement active file. */
260 static void
261 prepare_for_writing (void)
262 {
263   /* FIXME: If ALL the conditions listed below hold true, then the
264      replacement active file is guaranteed to be identical to the
265      original active file:
266
267      1. TEMPORARY was the first transformation, OR, there were no
268      transformations at all.
269
270      2. Input is not coming from an input program.
271
272      3. Compaction is not necessary.
273
274      So, in this case, we shouldn't have to replace the active
275      file--it's just a waste of time and space. */
276
277   vfm_sink_info.ncases = 0;
278   vfm_sink_info.nval = dict_get_next_value_idx (default_dict);
279   vfm_sink_info.case_size = dict_get_case_size (default_dict);
280   
281   if (vfm_sink == NULL)
282     {
283       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
284           && !paging)
285         {
286           msg (MW, _("Workspace overflow predicted.  Max workspace is "
287                      "currently set to %d KB (%d cases at %d bytes each).  "
288                      "Paging active file to disk."),
289                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
290                vfm_sink_info.case_size);
291           
292           paging = 1;
293         }
294       
295       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
296     }
297 }
298
299 /* Arrange for compacting the output cases for storage. */
300 static void
301 arrange_compaction (void)
302 {
303   int count_values = 0;
304
305   {
306     int i;
307     
308     /* Count up the number of `value's that will be output. */
309     for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
310       {
311         struct variable *v = dict_get_var (temp_dict, i);
312
313         if (v->name[0] != '#')
314           {
315             assert (v->nv > 0);
316             count_values += v->nv;
317           } 
318       }
319     assert (temporary == 2
320             || count_values <= dict_get_next_value_idx (temp_dict));
321   }
322   
323   /* Compaction is only necessary if the number of `value's to output
324      differs from the number already present. */
325   compaction_nval = count_values;
326   if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
327     compaction_necessary = 1;
328   else
329     compaction_necessary = 0;
330   
331   if (vfm_sink->init)
332     vfm_sink->init ();
333 }
334
335 /* Prepares the temporary case and compaction case. */
336 static void
337 make_temp_case (void)
338 {
339   temp_case = xmalloc (vfm_sink_info.case_size);
340
341   if (compaction_necessary)
342     compaction_case = xmalloc (sizeof (struct ccase)
343                                + sizeof (union value) * (compaction_nval - 1));
344 }
345
346 #if DEBUGGING
347 /* Returns the name of the variable that owns the index CCASE_INDEX
348    into ccase. */
349 static const char *
350 index_to_varname (int ccase_index)
351 {
352   int i;
353
354   for (i = 0; i < default_dict.nvar; i++)
355     {
356       struct variable *v = default_dict.var[i];
357       
358       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
359         return default_dict.var[i]->name;
360     }
361   return _("<NOVAR>");
362 }
363 #endif
364
365 /* Initializes temp_case from the vectors that say which `value's
366    need to be initialized just once, and which ones need to be
367    re-initialized before every case. */
368 static void
369 vector_initialization (void)
370 {
371   size_t var_cnt = dict_get_var_cnt (default_dict);
372   size_t i;
373   
374   for (i = 0; i < var_cnt; i++) 
375     {
376       struct variable *v = dict_get_var (default_dict, i);
377
378       if (v->type == NUMERIC) 
379         {
380           if (v->reinit)
381             temp_case->data[v->fv].f = 0.0;
382           else
383             temp_case->data[v->fv].f = SYSMIS;
384         }
385       else
386         memset (temp_case->data[v->fv].s, ' ', v->width);
387     }
388 }
389
390 /* Sets all the lag-related variables based on value of n_lag. */
391 static void
392 setup_lag (void)
393 {
394   int i;
395   
396   if (n_lag == 0)
397     return;
398
399   lag_count = 0;
400   lag_head = 0;
401   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
402   for (i = 0; i < n_lag; i++)
403     lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
404 }
405
406 /* There is a lot of potential confusion in the vfm and related
407    routines over the number of `value's at each stage of the process.
408    Here is each nval count, with explanation, as set up by
409    open_active_file():
410
411    vfm_source_info.nval: Number of `value's in the cases returned by
412    the source stream.  This value turns out not to be very useful, but
413    we maintain it anyway.
414
415    vfm_sink_info.nval: Number of `value's in the cases after all
416    transformations have been performed.  Never less than
417    vfm_source_info.nval.
418
419    temp_dict->nval: Number of `value's in the cases after the
420    transformations leading up to TEMPORARY have been performed.  If
421    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
422    Never less than vfm_sink_info.nval.
423
424    compaction_nval: Number of `value's in the cases after the
425    transformations leading up to TEMPORARY have been performed and the
426    case has been compacted by compact_case(), if compaction is
427    necessary.  This the number of `value's in the cases saved by the
428    sink stream.  (However, note that the cases passed to the sink
429    stream have not yet been compacted.  It is the responsibility of
430    the data sink to call compact_case().)  This may be less than,
431    greater than, or equal to vfm_source_info.nval.  `compaction'
432    becomes the new value of default_dict.nval after the procedure is
433    completed.
434
435    default_dict.nval: This is often an alias for temp_dict->nval.  As
436    such it can really have no separate existence until the procedure
437    is complete.  For this reason it should *not* be referenced inside
438    the execution of a procedure. */
439 /* Makes all preparations for reading from the data source and writing
440    to the data sink. */
441 static void
442 open_active_file (void)
443 {
444   /* Sometimes we want to refer to the dictionary that applies to the
445      data actually written to the sink.  This is either temp_dict or
446      default_dict.  However, if TEMPORARY is not on, then temp_dict
447      does not apply.  So, we can set temp_dict to default_dict in this
448      case. */
449   if (!temporary)
450     {
451       temp_trns = n_trns;
452       temp_dict = default_dict;
453     }
454
455   /* No cases passed to the procedure yet. */
456   case_count = 0;
457
458   /* The rest. */
459   prepare_for_writing ();
460   arrange_compaction ();
461   make_temp_case ();
462   vector_initialization ();
463   discard_ctl_stack ();
464   setup_lag ();
465
466   /* Debug output. */
467   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
468                  vfm_source->name, vfm_sink->name));
469   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
470                  "temp_dict->nval=%d, compaction_nval=%d, "
471                  "default_dict.nval=%d\n",
472                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
473                  compaction_nval, default_dict.nval));
474 }
475 \f
476 /* Closes the active file. */
477 static void
478 close_active_file (struct write_case_data *data)
479 {
480   /* Close the current case group. */
481   if (case_count && data->endfunc != NULL)
482     data->endfunc (data->aux);
483
484   /* Stop lagging (catch up?). */
485   if (n_lag)
486     {
487       int i;
488       
489       for (i = 0; i < n_lag; i++)
490         free (lag_queue[i]);
491       free (lag_queue);
492       n_lag = 0;
493     }
494   
495   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
496      off TEMPORARY. */
497   if (temporary)
498     {
499       dict_destroy (default_dict);
500       default_dict = temp_dict;
501       temp_dict = NULL;
502     }
503
504   /* Finish compaction. */
505   if (compaction_necessary)
506     finish_compaction ();
507     
508   /* Old data sink --> New data source. */
509   if (vfm_source && vfm_source->destroy_source)
510     vfm_source->destroy_source ();
511   
512   vfm_source = vfm_sink;
513   vfm_source_info.ncases = vfm_sink_info.ncases;
514   vfm_source_info.nval = compaction_nval;
515   vfm_source_info.case_size = (sizeof (struct ccase)
516                                + (compaction_nval - 1) * sizeof (union value));
517   if (vfm_source->mode)
518     vfm_source->mode ();
519
520   /* Old data sink is gone now. */
521   vfm_sink = NULL;
522
523   /* Cancel TEMPORARY. */
524   cancel_temporary ();
525
526   /* Free temporary cases. */
527   free (temp_case);
528   temp_case = NULL;
529
530   free (compaction_case);
531   compaction_case = NULL;
532
533   /* Cancel PROCESS IF. */
534   expr_free (process_if_expr);
535   process_if_expr = NULL;
536
537   /* Cancel FILTER if temporary. */
538   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
539     dict_set_filter (default_dict, NULL);
540
541   /* Cancel transformations. */
542   cancel_transformations ();
543
544   /* Turn off case limiter. */
545   dict_set_case_limit (default_dict, 0);
546
547   /* Clear VECTOR vectors. */
548   dict_clear_vectors (default_dict);
549
550   debug_printf (("vfm: procedure complete\n\n"));
551 }
552 \f
553 /* Disk case stream. */
554
555 /* Associated files. */
556 FILE *disk_source_file;
557 FILE *disk_sink_file;
558
559 /* Initializes the disk sink. */
560 static void
561 disk_stream_init (void)
562 {
563   disk_sink_file = tmpfile ();
564   if (!disk_sink_file)
565     {
566       msg (ME, _("An error occurred attempting to create a temporary "
567                  "file for use as the active file: %s."),
568            strerror (errno));
569       err_failure ();
570     }
571 }
572
573 /* Reads all cases from the disk source and passes them one by one to
574    write_case(). */
575 static void
576 disk_stream_read (write_case_func *write_case, write_case_data wc_data)
577 {
578   int i;
579
580   for (i = 0; i < vfm_source_info.ncases; i++)
581     {
582       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
583         {
584           msg (ME, _("An error occurred while attempting to read from "
585                "a temporary file created for the active file: %s."),
586                strerror (errno));
587           err_failure ();
588           return;
589         }
590
591       if (!write_case (wc_data))
592         return;
593     }
594 }
595
596 /* Writes temp_case to the disk sink. */
597 static void
598 disk_stream_write (void)
599 {
600   union value *src_case;
601
602   if (compaction_necessary)
603     {
604       compact_case (compaction_case, temp_case);
605       src_case = (union value *) compaction_case;
606     }
607   else src_case = (union value *) temp_case;
608
609   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
610               disk_sink_file) != 1)
611     {
612       msg (ME, _("An error occurred while attempting to write to a "
613                  "temporary file used as the active file: %s."),
614            strerror (errno));
615       err_failure ();
616     }
617 }
618
619 /* Switches the stream from a sink to a source. */
620 static void
621 disk_stream_mode (void)
622 {
623   /* Rewind the sink. */
624   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
625     {
626       msg (ME, _("An error occurred while attempting to rewind a "
627                  "temporary file used as the active file: %s."),
628            strerror (errno));
629       err_failure ();
630     }
631   
632   /* Sink --> source variables. */
633   disk_source_file = disk_sink_file;
634 }
635
636 /* Destroys the source's internal data. */
637 static void
638 disk_stream_destroy_source (void)
639 {
640   if (disk_source_file)
641     {
642       fclose (disk_source_file);
643       disk_source_file = NULL;
644     }
645 }
646
647 /* Destroys the sink's internal data. */
648 static void
649 disk_stream_destroy_sink (void)
650 {
651   if (disk_sink_file)
652     {
653       fclose (disk_sink_file);
654       disk_sink_file = NULL;
655     }
656 }
657
658 /* Disk stream. */
659 struct case_stream vfm_disk_stream = 
660   {
661     disk_stream_init,
662     disk_stream_read,
663     disk_stream_write,
664     disk_stream_mode,
665     disk_stream_destroy_source,
666     disk_stream_destroy_sink,
667     "disk",
668   };
669 \f
670 /* Memory case stream. */
671
672 /* List of cases stored in the stream. */
673 struct case_list *memory_source_cases;
674 struct case_list *memory_sink_cases;
675
676 /* Current case. */
677 struct case_list *memory_sink_iter;
678
679 /* Maximum number of cases. */
680 int memory_sink_max_cases;
681
682 /* Initializes the memory stream variables for writing. */
683 static void
684 memory_stream_init (void)
685 {
686   memory_sink_cases = NULL;
687   memory_sink_iter = NULL;
688   
689   assert (compaction_nval);
690   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
691 }
692
693 /* Reads the case stream from memory and passes it to write_case(). */
694 static void
695 memory_stream_read (write_case_func *write_case, write_case_data wc_data)
696 {
697   while (memory_source_cases != NULL)
698     {
699       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
700       
701       {
702         struct case_list *current = memory_source_cases;
703         memory_source_cases = memory_source_cases->next;
704         free (current);
705       }
706       
707       if (!write_case (wc_data))
708         return;
709     }
710 }
711
712 /* Writes temp_case to the memory stream. */
713 static void
714 memory_stream_write (void)
715 {
716   struct case_list *new_case = malloc (sizeof (struct case_list)
717                                        + ((compaction_nval - 1)
718                                           * sizeof (union value)));
719
720   /* If we've got memory to spare then add it to the linked list. */
721   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
722     {
723       if (compaction_necessary)
724         compact_case (&new_case->c, temp_case);
725       else
726         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
727
728       /* Append case to linked list. */
729       if (memory_sink_cases)
730         memory_sink_iter = memory_sink_iter->next = new_case;
731       else
732         memory_sink_iter = memory_sink_cases = new_case;
733     }
734   else
735     {
736       /* Out of memory.  Write the active file to disk. */
737       struct case_list *cur, *next;
738
739       /* Notify the user. */
740       if (!new_case)
741         msg (MW, _("Virtual memory exhausted.  Paging active file "
742                    "to disk."));
743       else
744         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
745                    "overflowed.  Paging active file to disk."),
746              MAX_WORKSPACE / 1024, memory_sink_max_cases,
747              compaction_nval * sizeof (union value));
748
749       free (new_case);
750
751       /* Switch to a disk sink. */
752       vfm_sink = &vfm_disk_stream;
753       vfm_sink->init ();
754       paging = 1;
755
756       /* Terminate the list. */
757       if (memory_sink_iter)
758         memory_sink_iter->next = NULL;
759
760       /* Write the cases to disk and destroy them.  We can't call
761          vfm->sink->write() because of compaction. */
762       for (cur = memory_sink_cases; cur; cur = next)
763         {
764           next = cur->next;
765           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
766                       disk_sink_file) != 1)
767             {
768               msg (ME, _("An error occurred while attempting to "
769                          "write to a temporary file created as the "
770                          "active file, while paging to disk: %s."),
771                    strerror (errno));
772               err_failure ();
773             }
774           free (cur);
775         }
776
777       /* Write the current case to disk. */
778       vfm_sink->write ();
779     }
780 }
781
782 /* If the data is stored in memory, causes it to be written to disk.
783    To be called only *between* procedure()s, not within them. */
784 void
785 page_to_disk (void)
786 {
787   if (vfm_source == &vfm_memory_stream)
788     {
789       /* Switch to a disk sink. */
790       vfm_sink = &vfm_disk_stream;
791       vfm_sink->init ();
792       paging = 1;
793       
794       /* Write the cases to disk and destroy them.  We can't call
795          vfm->sink->write() because of compaction. */
796       {
797         struct case_list *cur, *next;
798         
799         for (cur = memory_source_cases; cur; cur = next)
800           {
801             next = cur->next;
802             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
803                         disk_sink_file) != 1)
804               {
805                 msg (ME, _("An error occurred while attempting to "
806                            "write to a temporary file created as the "
807                            "active file, while paging to disk: %s."),
808                      strerror (errno));
809                 err_failure ();
810               }
811             free (cur);
812           }
813       }
814       
815       vfm_source = &vfm_disk_stream;
816       vfm_source->mode ();
817
818       vfm_sink = NULL;
819     }
820 }
821
822 /* Switch the memory stream from sink to source mode. */
823 static void
824 memory_stream_mode (void)
825 {
826   /* Terminate the list. */
827   if (memory_sink_iter)
828     memory_sink_iter->next = NULL;
829
830   /* Sink --> source variables. */
831   memory_source_cases = memory_sink_cases;
832   memory_sink_cases = NULL;
833 }
834
835 /* Destroy all memory source data. */
836 static void
837 memory_stream_destroy_source (void)
838 {
839   struct case_list *cur, *next;
840   
841   for (cur = memory_source_cases; cur; cur = next)
842     {
843       next = cur->next;
844       free (cur);
845     }
846   memory_source_cases = NULL;
847 }
848
849 /* Destroy all memory sink data. */
850 static void
851 memory_stream_destroy_sink (void)
852 {
853   struct case_list *cur, *next;
854   
855   for (cur = memory_sink_cases; cur; cur = next)
856     {
857       next = cur->next;
858       free (cur);
859     }
860   memory_sink_cases = NULL;
861 }
862   
863 /* Memory stream. */
864 struct case_stream vfm_memory_stream = 
865   {
866     memory_stream_init,
867     memory_stream_read,
868     memory_stream_write,
869     memory_stream_mode,
870     memory_stream_destroy_source,
871     memory_stream_destroy_sink,
872     "memory",
873   };
874 \f
875 #include "debug-print.h"
876
877 /* Add temp_case to the lag queue. */
878 static void
879 lag_case (void)
880 {
881   if (lag_count < n_lag)
882     lag_count++;
883   memcpy (lag_queue[lag_head], temp_case,
884           dict_get_case_size (temp_dict));
885   if (++lag_head >= n_lag)
886     lag_head = 0;
887 }
888
889 /* Returns a pointer to the lagged case from N_BEFORE cases before the
890    current one, or NULL if there haven't been that many cases yet. */
891 struct ccase *
892 lagged_case (int n_before)
893 {
894   assert (n_before <= n_lag);
895   if (n_before > lag_count)
896     return NULL;
897   
898   {
899     int index = lag_head - n_before;
900     if (index < 0)
901       index += n_lag;
902     return lag_queue[index];
903   }
904 }
905    
906 /* Transforms temp_case and writes it to the replacement active file
907    if advisable.  Returns nonzero if more cases can be accepted, zero
908    otherwise.  Do not call this function again after it has returned
909    zero once.  */
910 int
911 procedure_write_case (write_case_data wc_data)
912 {
913   /* Index of current transformation. */
914   int cur_trns;
915
916   /* Return value: whether it's reasonable to write any more cases. */
917   int more_cases = 1;
918
919   debug_printf ((_("transform: ")));
920
921   cur_trns = f_trns;
922   for (;;)
923     {
924       /* Output the case if this is temp_trns. */
925       if (cur_trns == temp_trns)
926         {
927           debug_printf (("REC"));
928
929           if (n_lag)
930             lag_case ();
931           
932           vfm_sink_info.ncases++;
933           vfm_sink->write ();
934
935           if (dict_get_case_limit (default_dict))
936             more_cases = (vfm_sink_info.ncases
937                           < dict_get_case_limit (default_dict));
938         }
939
940       /* Are we done? */
941       if (cur_trns >= n_trns)
942         break;
943       
944       debug_printf (("$%d", cur_trns));
945
946       /* Decide which transformation should come next. */
947       {
948         int code;
949         
950         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
951         switch (code)
952           {
953           case -1:
954             /* Next transformation. */
955             cur_trns++;
956             break;
957           case -2:
958             /* Delete this case. */
959             goto done;
960           default:
961             /* Go to that transformation. */
962             cur_trns = code;
963             break;
964           }
965       }
966     }
967
968   /* Call the beginning of group function. */
969   if (!case_count && wc_data->beginfunc != NULL)
970     wc_data->beginfunc (wc_data->aux);
971
972   /* Call the procedure if there is one and FILTER and PROCESS IF
973      don't prohibit it. */
974   if (wc_data->procfunc != NULL && !exclude_this_case ())
975     wc_data->procfunc (temp_case, wc_data->aux);
976
977   case_count++;
978   
979 done:
980   debug_putc ('\n', stdout);
981
982   clear_temp_case ();
983   
984   /* Return previously determined value. */
985   return more_cases;
986 }
987
988 /* Clears the variables in the temporary case that need to be
989    cleared between processing cases.  */
990 static void
991 clear_temp_case (void)
992 {
993   /* FIXME?  This is linear in the number of variables, but
994      doesn't need to be, so it's an easy optimization target. */
995   size_t var_cnt = dict_get_var_cnt (default_dict);
996   size_t i;
997   
998   for (i = 0; i < var_cnt; i++) 
999     {
1000       struct variable *v = dict_get_var (default_dict, i);
1001       if (v->init && v->reinit) 
1002         {
1003           if (v->type == NUMERIC) 
1004             temp_case->data[v->fv].f = SYSMIS;
1005           else
1006             memset (temp_case->data[v->fv].s, ' ', v->width);
1007         } 
1008     }
1009 }
1010
1011 /* Returns nonzero if this case should be exclude as specified on
1012    FILTER or PROCESS IF, otherwise zero. */
1013 static int
1014 exclude_this_case (void)
1015 {
1016   /* FILTER. */
1017   struct variable *filter_var = dict_get_filter (default_dict);
1018   if (filter_var != NULL) 
1019     {
1020       double f = temp_case->data[filter_var->fv].f;
1021       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
1022         return 1;
1023     }
1024
1025   /* PROCESS IF. */
1026   if (process_if_expr != NULL
1027       && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0)
1028     return 1;
1029
1030   return 0;
1031 }
1032
1033 /* Appends TRNS to t_trns[], the list of all transformations to be
1034    performed on data as it is read from the active file. */
1035 void
1036 add_transformation (struct trns_header * trns)
1037 {
1038   if (n_trns >= m_trns)
1039     {
1040       m_trns += 16;
1041       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1042     }
1043   t_trns[n_trns] = trns;
1044   trns->index = n_trns++;
1045 }
1046
1047 /* Cancels all active transformations, including any transformations
1048    created by the input program. */
1049 void
1050 cancel_transformations (void)
1051 {
1052   int i;
1053   for (i = 0; i < n_trns; i++)
1054     {
1055       if (t_trns[i]->free)
1056         t_trns[i]->free (t_trns[i]);
1057       free (t_trns[i]);
1058     }
1059   n_trns = f_trns = 0;
1060   if (m_trns > 32)
1061     {
1062       free (t_trns);
1063       m_trns = 0;
1064     }
1065 }
1066
1067 /* Dumps out the values of all the split variables for the case C. */
1068 static void
1069 dump_splits (struct ccase *c)
1070 {
1071   struct variable *const *split;
1072   struct tab_table *t;
1073   size_t split_cnt;
1074   int i;
1075
1076   split_cnt = dict_get_split_cnt (default_dict);
1077   t = tab_create (3, split_cnt + 1, 0);
1078   tab_dim (t, tab_natural_dimensions);
1079   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1080   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1081   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1082   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1083   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1084   split = dict_get_split_vars (default_dict);
1085   for (i = 0; i < split_cnt; i++)
1086     {
1087       struct variable *v = split[i];
1088       char temp_buf[80];
1089       const char *val_lab;
1090
1091       assert (v->type == NUMERIC || v->type == ALPHA);
1092       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1093       
1094       {
1095         union value val = c->data[v->fv];
1096         if (v->type == ALPHA)
1097           val.c = c->data[v->fv].s;
1098         data_out (temp_buf, &v->print, &val);
1099       }
1100       
1101       temp_buf[v->print.w] = 0;
1102       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1103
1104       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1105       if (val_lab)
1106         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1107     }
1108   tab_flags (t, SOMF_NO_TITLE);
1109   tab_submit (t);
1110 }
1111
1112 /* This procfunc is substituted for the user-supplied procfunc when
1113    SPLIT FILE is active.  This function forms a wrapper around that
1114    procfunc by dividing the input into series. */
1115 static int
1116 SPLIT_FILE_procfunc (struct ccase *c, void *data_)
1117 {
1118   struct write_case_data *data = data_;
1119   static struct ccase *prev_case;
1120   struct variable *const *split;
1121   size_t split_cnt;
1122   size_t i;
1123
1124   /* The first case always begins a new series.  We also need to
1125      preserve the values of the case for later comparison. */
1126   if (case_count == 0)
1127     {
1128       if (prev_case)
1129         free (prev_case);
1130       prev_case = xmalloc (vfm_sink_info.case_size);
1131       memcpy (prev_case, c, vfm_sink_info.case_size);
1132
1133       dump_splits (c);
1134       if (data->beginfunc != NULL)
1135         data->beginfunc (data->aux);
1136       
1137       return data->procfunc (c, data->aux);
1138     }
1139
1140   /* Compare the value of each SPLIT FILE variable to the values on
1141      the previous case. */
1142   split = dict_get_split_vars (default_dict);
1143   split_cnt = dict_get_split_cnt (default_dict);
1144   for (i = 0; i < split_cnt; i++)
1145     {
1146       struct variable *v = split[i];
1147       
1148       switch (v->type)
1149         {
1150         case NUMERIC:
1151           if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
1152             goto not_equal;
1153           break;
1154         case ALPHA:
1155           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1156             goto not_equal;
1157           break;
1158         default:
1159           assert (0);
1160         }
1161     }
1162   return data->procfunc (c, data->aux);
1163   
1164 not_equal:
1165   /* The values of the SPLIT FILE variable are different from the
1166      values on the previous case.  That means that it's time to begin
1167      a new series. */
1168   if (data->endfunc != NULL)
1169     data->endfunc (data->aux);
1170   dump_splits (c);
1171   if (data->beginfunc != NULL)
1172     data->beginfunc (data->aux);
1173   memcpy (prev_case, c, vfm_sink_info.case_size);
1174   return data->procfunc (c, data->aux);
1175 }
1176 \f
1177 /* Case compaction. */
1178
1179 /* Copies case SRC to case DEST, compacting it in the process. */
1180 void
1181 compact_case (struct ccase *dest, const struct ccase *src)
1182 {
1183   int i;
1184   int nval = 0;
1185   size_t var_cnt;
1186   
1187   assert (compaction_necessary);
1188
1189   if (temporary == 2)
1190     {
1191       if (dest != compaction_case)
1192         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1193       return;
1194     }
1195
1196   /* Copy all the variables except the scratch variables from SRC to
1197      DEST. */
1198   var_cnt = dict_get_var_cnt (default_dict);
1199   for (i = 0; i < var_cnt; i++)
1200     {
1201       struct variable *v = dict_get_var (default_dict, i);
1202       
1203       if (v->name[0] == '#')
1204         continue;
1205
1206       if (v->type == NUMERIC)
1207         dest->data[nval++] = src->data[v->fv];
1208       else
1209         {
1210           int w = DIV_RND_UP (v->width, sizeof (union value));
1211           
1212           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1213           nval += w;
1214         }
1215     }
1216 }
1217
1218 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1219 static void
1220 finish_compaction (void)
1221 {
1222   int i;
1223
1224   for (i = 0; i < dict_get_var_cnt (default_dict); )
1225     {
1226       struct variable *v = dict_get_var (default_dict, i);
1227
1228       if (v->name[0] == '#') 
1229         dict_delete_var (default_dict, v);
1230       else
1231         i++;
1232     }
1233   dict_compact_values (default_dict);
1234 }
1235
1236