42e6b7ccd29dfef5e2b5792cbe16c705fdceaec9
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "do-ifP.h"
32 #include "error.h"
33 #include "expr.h"
34 #include "misc.h"
35 #include "random.h"
36 #include "som.h"
37 #include "str.h"
38 #include "tab.h"
39 #include "var.h"
40 #include "value-labels.h"
41
42 /*
43    Virtual File Manager (vfm):
44
45    vfm is used to process data files.  It uses the model that data is
46    read from one stream (the data source), then written to another
47    (the data sink).  The data source is then deleted and the data sink
48    becomes the data source for the next procedure. */
49
50 #include "debug-print.h"
51
52 /* Procedure execution data. */
53 struct write_case_data
54   {
55     void (*beginfunc) (void *);
56     int (*procfunc) (struct ccase *, void *);
57     void (*endfunc) (void *);
58     void *aux;
59   };
60
61 /* This is used to read from the active file. */
62 struct case_stream *vfm_source;
63
64 /* This is used to write to the replacement active file. */
65 struct case_stream *vfm_sink;
66
67 /* Information about the data source. */
68 struct stream_info vfm_source_info;
69
70 /* Information about the data sink. */
71 struct stream_info vfm_sink_info;
72
73 /* Nonzero if the case needs to have values deleted before being
74    stored, zero otherwise. */
75 int compaction_necessary;
76
77 /* Number of values after compaction, or the same as
78    vfm_sink_info.nval, if compaction is not necessary. */
79 int compaction_nval;
80
81 /* Temporary case buffer with enough room for `compaction_nval'
82    `value's. */
83 struct ccase *compaction_case;
84
85 /* Within a session, when paging is turned on, it is never turned back
86    off.  This policy might be too aggressive. */
87 static int paging = 0;
88
89 /* Time at which vfm was last invoked. */
90 time_t last_vfm_invocation;
91
92 /* Number of cases passed to proc_func(). */
93 static int case_count;
94
95 /* Lag queue. */
96 int n_lag;                      /* Number of cases to lag. */
97 static int lag_count;           /* Number of cases in lag_queue so far. */
98 static int lag_head;            /* Index where next case will be added. */
99 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
100
101 static void open_active_file (void);
102 static void close_active_file (struct write_case_data *);
103 static int SPLIT_FILE_procfunc (struct ccase *, void *);
104 static void finish_compaction (void);
105 static void lag_case (void);
106 static int procedure_write_case (struct write_case_data *);
107 static void clear_temp_case (void);
108 static int exclude_this_case (void);
109 \f
110 /* Public functions. */
111
112 /* Reads all the cases from the active file, transforms them by
113    the active set of transformations, calls PROCFUNC with CURCASE
114    set to the case, and writes them to a new active file.
115
116    Divides the active file into zero or more series of one or more
117    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
118    called after each series.
119
120    Arbitrary user-specified data AUX is passed to BEGINFUNC,
121    PROCFUNC, and ENDFUNC as auxiliary data. */
122 void
123 procedure (void (*beginfunc) (void *),
124            int (*procfunc) (struct ccase *curcase, void *),
125            void (*endfunc) (void *),
126            void *aux)
127 {
128   static int recursive_call;
129
130   struct write_case_data procedure_write_data;
131   struct write_case_data split_file_data;
132
133   assert (++recursive_call == 1);
134
135   if (dict_get_split_cnt (default_dict) == 0) 
136     {
137       /* Normally we just use the data passed by the user. */
138       procedure_write_data.beginfunc = beginfunc;
139       procedure_write_data.procfunc = procfunc;
140       procedure_write_data.endfunc = endfunc;
141       procedure_write_data.aux = aux;
142     }
143   else
144     {
145       /* Under SPLIT FILE, we add a layer of indirection. */
146       procedure_write_data.beginfunc = NULL;
147       procedure_write_data.procfunc = SPLIT_FILE_procfunc;
148       procedure_write_data.endfunc = endfunc;
149       procedure_write_data.aux = &split_file_data;
150
151       split_file_data.beginfunc = beginfunc;
152       split_file_data.procfunc = procfunc;
153       split_file_data.endfunc = endfunc;
154       split_file_data.aux = aux;
155     }
156
157   last_vfm_invocation = time (NULL);
158
159   open_active_file ();
160   vfm_source->read (procedure_write_case, &procedure_write_data);
161   close_active_file (&procedure_write_data);
162
163   assert (--recursive_call == 0);
164 }
165 \f
166 /* Active file processing support.  Subtly different semantics from
167    procedure(). */
168
169 static int process_active_file_write_case (struct write_case_data *data);
170
171 /* The casefunc might want us to stop calling it. */
172 static int not_canceled;
173
174 /* Reads all the cases from the active file and passes them one-by-one
175    to CASEFUNC in temp_case.  Before any cases are passed, calls
176    BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
177    BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
178    file by calling process_active_file_output_case().
179
180    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
181 void
182 process_active_file (void (*beginfunc) (void *),
183                      int (*casefunc) (struct ccase *curcase, void *),
184                      void (*endfunc) (void *),
185                      void *aux)
186 {
187   struct write_case_data process_active_write_data;
188
189   process_active_write_data.beginfunc = beginfunc;
190   process_active_write_data.procfunc = casefunc;
191   process_active_write_data.endfunc = endfunc;
192   process_active_write_data.aux = aux;
193
194   not_canceled = 1;
195
196   open_active_file ();
197   beginfunc (aux);
198   
199   /* There doesn't necessarily need to be an active file. */
200   if (vfm_source)
201     vfm_source->read (process_active_file_write_case,
202                       &process_active_write_data);
203   
204   endfunc (aux);
205   close_active_file (&process_active_write_data);
206 }
207
208 /* Pass the current case to casefunc. */
209 static int
210 process_active_file_write_case (struct write_case_data *data)
211 {
212   /* Index of current transformation. */
213   int cur_trns;
214
215   for (cur_trns = f_trns ; cur_trns != temp_trns; )
216     {
217       int code;
218         
219       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
220       switch (code)
221         {
222         case -1:
223           /* Next transformation. */
224           cur_trns++;
225           break;
226         case -2:
227           /* Delete this case. */
228           goto done;
229         default:
230           /* Go to that transformation. */
231           cur_trns = code;
232           break;
233         }
234     }
235
236   if (n_lag)
237     lag_case ();
238           
239   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
240   if (not_canceled && !exclude_this_case ())
241     not_canceled = data->procfunc (temp_case, data->aux);
242   
243   case_count++;
244   
245  done:
246   clear_temp_case ();
247
248   return 1;
249 }
250
251 /* Write temp_case to the active file. */
252 void
253 process_active_file_output_case (void)
254 {
255   vfm_sink_info.ncases++;
256   vfm_sink->write ();
257 }
258 \f
259 /* Opening the active file. */
260
261 /* It might be usefully noted that the following several functions are
262    given in the order that they are called by open_active_file(). */
263
264 /* Prepare to write to the replacement active file. */
265 static void
266 prepare_for_writing (void)
267 {
268   /* FIXME: If ALL the conditions listed below hold true, then the
269      replacement active file is guaranteed to be identical to the
270      original active file:
271
272      1. TEMPORARY was the first transformation, OR, there were no
273      transformations at all.
274
275      2. Input is not coming from an input program.
276
277      3. Compaction is not necessary.
278
279      So, in this case, we shouldn't have to replace the active
280      file--it's just a waste of time and space. */
281
282   vfm_sink_info.ncases = 0;
283   vfm_sink_info.nval = dict_get_next_value_idx (default_dict);
284   vfm_sink_info.case_size = dict_get_case_size (default_dict);
285   
286   if (vfm_sink == NULL)
287     {
288       if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
289           && !paging)
290         {
291           msg (MW, _("Workspace overflow predicted.  Max workspace is "
292                      "currently set to %d KB (%d cases at %d bytes each).  "
293                      "Paging active file to disk."),
294                MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
295                vfm_sink_info.case_size);
296           
297           paging = 1;
298         }
299       
300       vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
301     }
302 }
303
304 /* Arrange for compacting the output cases for storage. */
305 static void
306 arrange_compaction (void)
307 {
308   int count_values = 0;
309
310   {
311     int i;
312     
313     /* Count up the number of `value's that will be output. */
314     for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
315       {
316         struct variable *v = dict_get_var (temp_dict, i);
317
318         if (v->name[0] != '#')
319           {
320             assert (v->nv > 0);
321             count_values += v->nv;
322           } 
323       }
324     assert (temporary == 2
325             || count_values <= dict_get_next_value_idx (temp_dict));
326   }
327   
328   /* Compaction is only necessary if the number of `value's to output
329      differs from the number already present. */
330   compaction_nval = count_values;
331   if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
332     compaction_necessary = 1;
333   else
334     compaction_necessary = 0;
335   
336   if (vfm_sink->init)
337     vfm_sink->init ();
338 }
339
340 /* Prepares the temporary case and compaction case. */
341 static void
342 make_temp_case (void)
343 {
344   temp_case = xmalloc (vfm_sink_info.case_size);
345
346   if (compaction_necessary)
347     compaction_case = xmalloc (sizeof (struct ccase)
348                                + sizeof (union value) * (compaction_nval - 1));
349 }
350
351 #if DEBUGGING
352 /* Returns the name of the variable that owns the index CCASE_INDEX
353    into ccase. */
354 static const char *
355 index_to_varname (int ccase_index)
356 {
357   int i;
358
359   for (i = 0; i < default_dict.nvar; i++)
360     {
361       struct variable *v = default_dict.var[i];
362       
363       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
364         return default_dict.var[i]->name;
365     }
366   return _("<NOVAR>");
367 }
368 #endif
369
370 /* Initializes temp_case from the vectors that say which `value's
371    need to be initialized just once, and which ones need to be
372    re-initialized before every case. */
373 static void
374 vector_initialization (void)
375 {
376   size_t var_cnt = dict_get_var_cnt (default_dict);
377   size_t i;
378   
379   for (i = 0; i < var_cnt; i++) 
380     {
381       struct variable *v = dict_get_var (default_dict, i);
382
383       if (v->type == NUMERIC) 
384         {
385           if (v->reinit)
386             temp_case->data[v->fv].f = 0.0;
387           else
388             temp_case->data[v->fv].f = SYSMIS;
389         }
390       else
391         memset (temp_case->data[v->fv].s, ' ', v->width);
392     }
393 }
394
395 /* Sets all the lag-related variables based on value of n_lag. */
396 static void
397 setup_lag (void)
398 {
399   int i;
400   
401   if (n_lag == 0)
402     return;
403
404   lag_count = 0;
405   lag_head = 0;
406   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
407   for (i = 0; i < n_lag; i++)
408     lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
409 }
410
411 /* There is a lot of potential confusion in the vfm and related
412    routines over the number of `value's at each stage of the process.
413    Here is each nval count, with explanation, as set up by
414    open_active_file():
415
416    vfm_source_info.nval: Number of `value's in the cases returned by
417    the source stream.  This value turns out not to be very useful, but
418    we maintain it anyway.
419
420    vfm_sink_info.nval: Number of `value's in the cases after all
421    transformations have been performed.  Never less than
422    vfm_source_info.nval.
423
424    temp_dict->nval: Number of `value's in the cases after the
425    transformations leading up to TEMPORARY have been performed.  If
426    TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
427    Never less than vfm_sink_info.nval.
428
429    compaction_nval: Number of `value's in the cases after the
430    transformations leading up to TEMPORARY have been performed and the
431    case has been compacted by compact_case(), if compaction is
432    necessary.  This the number of `value's in the cases saved by the
433    sink stream.  (However, note that the cases passed to the sink
434    stream have not yet been compacted.  It is the responsibility of
435    the data sink to call compact_case().)  This may be less than,
436    greater than, or equal to vfm_source_info.nval.  `compaction'
437    becomes the new value of default_dict.nval after the procedure is
438    completed.
439
440    default_dict.nval: This is often an alias for temp_dict->nval.  As
441    such it can really have no separate existence until the procedure
442    is complete.  For this reason it should *not* be referenced inside
443    the execution of a procedure. */
444 /* Makes all preparations for reading from the data source and writing
445    to the data sink. */
446 static void
447 open_active_file (void)
448 {
449   /* Sometimes we want to refer to the dictionary that applies to the
450      data actually written to the sink.  This is either temp_dict or
451      default_dict.  However, if TEMPORARY is not on, then temp_dict
452      does not apply.  So, we can set temp_dict to default_dict in this
453      case. */
454   if (!temporary)
455     {
456       temp_trns = n_trns;
457       temp_dict = default_dict;
458     }
459
460   /* No cases passed to the procedure yet. */
461   case_count = 0;
462
463   /* The rest. */
464   prepare_for_writing ();
465   arrange_compaction ();
466   make_temp_case ();
467   vector_initialization ();
468   discard_ctl_stack ();
469   setup_lag ();
470
471   /* Debug output. */
472   debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
473                  vfm_source->name, vfm_sink->name));
474   debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
475                  "temp_dict->nval=%d, compaction_nval=%d, "
476                  "default_dict.nval=%d\n",
477                  vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
478                  compaction_nval, default_dict.nval));
479 }
480 \f
481 /* Closes the active file. */
482 static void
483 close_active_file (struct write_case_data *data)
484 {
485   /* Close the current case group. */
486   if (case_count && data->endfunc != NULL)
487     data->endfunc (data->aux);
488
489   /* Stop lagging (catch up?). */
490   if (n_lag)
491     {
492       int i;
493       
494       for (i = 0; i < n_lag; i++)
495         free (lag_queue[i]);
496       free (lag_queue);
497       n_lag = 0;
498     }
499   
500   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
501      off TEMPORARY. */
502   if (temporary)
503     {
504       dict_destroy (default_dict);
505       default_dict = temp_dict;
506       temp_dict = NULL;
507     }
508
509   /* Finish compaction. */
510   if (compaction_necessary)
511     finish_compaction ();
512     
513   /* Old data sink --> New data source. */
514   if (vfm_source && vfm_source->destroy_source)
515     vfm_source->destroy_source ();
516   
517   vfm_source = vfm_sink;
518   vfm_source_info.ncases = vfm_sink_info.ncases;
519   vfm_source_info.nval = compaction_nval;
520   vfm_source_info.case_size = (sizeof (struct ccase)
521                                + (compaction_nval - 1) * sizeof (union value));
522   if (vfm_source->mode)
523     vfm_source->mode ();
524
525   /* Old data sink is gone now. */
526   vfm_sink = NULL;
527
528   /* Cancel TEMPORARY. */
529   cancel_temporary ();
530
531   /* Free temporary cases. */
532   free (temp_case);
533   temp_case = NULL;
534
535   free (compaction_case);
536   compaction_case = NULL;
537
538   /* Cancel PROCESS IF. */
539   expr_free (process_if_expr);
540   process_if_expr = NULL;
541
542   /* Cancel FILTER if temporary. */
543   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
544     dict_set_filter (default_dict, NULL);
545
546   /* Cancel transformations. */
547   cancel_transformations ();
548
549   /* Turn off case limiter. */
550   dict_set_case_limit (default_dict, 0);
551
552   /* Clear VECTOR vectors. */
553   dict_clear_vectors (default_dict);
554
555   debug_printf (("vfm: procedure complete\n\n"));
556 }
557 \f
558 /* Disk case stream. */
559
560 /* Associated files. */
561 FILE *disk_source_file;
562 FILE *disk_sink_file;
563
564 /* Initializes the disk sink. */
565 static void
566 disk_stream_init (void)
567 {
568   disk_sink_file = tmpfile ();
569   if (!disk_sink_file)
570     {
571       msg (ME, _("An error occurred attempting to create a temporary "
572                  "file for use as the active file: %s."),
573            strerror (errno));
574       err_failure ();
575     }
576 }
577
578 /* Reads all cases from the disk source and passes them one by one to
579    write_case(). */
580 static void
581 disk_stream_read (write_case_func *write_case, write_case_data wc_data)
582 {
583   int i;
584
585   for (i = 0; i < vfm_source_info.ncases; i++)
586     {
587       if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
588         {
589           msg (ME, _("An error occurred while attempting to read from "
590                "a temporary file created for the active file: %s."),
591                strerror (errno));
592           err_failure ();
593           return;
594         }
595
596       if (!write_case (wc_data))
597         return;
598     }
599 }
600
601 /* Writes temp_case to the disk sink. */
602 static void
603 disk_stream_write (void)
604 {
605   union value *src_case;
606
607   if (compaction_necessary)
608     {
609       compact_case (compaction_case, temp_case);
610       src_case = (union value *) compaction_case;
611     }
612   else src_case = (union value *) temp_case;
613
614   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
615               disk_sink_file) != 1)
616     {
617       msg (ME, _("An error occurred while attempting to write to a "
618                  "temporary file used as the active file: %s."),
619            strerror (errno));
620       err_failure ();
621     }
622 }
623
624 /* Switches the stream from a sink to a source. */
625 static void
626 disk_stream_mode (void)
627 {
628   /* Rewind the sink. */
629   if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
630     {
631       msg (ME, _("An error occurred while attempting to rewind a "
632                  "temporary file used as the active file: %s."),
633            strerror (errno));
634       err_failure ();
635     }
636   
637   /* Sink --> source variables. */
638   disk_source_file = disk_sink_file;
639 }
640
641 /* Destroys the source's internal data. */
642 static void
643 disk_stream_destroy_source (void)
644 {
645   if (disk_source_file)
646     {
647       fclose (disk_source_file);
648       disk_source_file = NULL;
649     }
650 }
651
652 /* Destroys the sink's internal data. */
653 static void
654 disk_stream_destroy_sink (void)
655 {
656   if (disk_sink_file)
657     {
658       fclose (disk_sink_file);
659       disk_sink_file = NULL;
660     }
661 }
662
663 /* Disk stream. */
664 struct case_stream vfm_disk_stream = 
665   {
666     disk_stream_init,
667     disk_stream_read,
668     disk_stream_write,
669     disk_stream_mode,
670     disk_stream_destroy_source,
671     disk_stream_destroy_sink,
672     "disk",
673   };
674 \f
675 /* Memory case stream. */
676
677 /* List of cases stored in the stream. */
678 struct case_list *memory_source_cases;
679 struct case_list *memory_sink_cases;
680
681 /* Current case. */
682 struct case_list *memory_sink_iter;
683
684 /* Maximum number of cases. */
685 int memory_sink_max_cases;
686
687 /* Initializes the memory stream variables for writing. */
688 static void
689 memory_stream_init (void)
690 {
691   memory_sink_cases = NULL;
692   memory_sink_iter = NULL;
693   
694   assert (compaction_nval);
695   memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
696 }
697
698 /* Reads the case stream from memory and passes it to write_case(). */
699 static void
700 memory_stream_read (write_case_func *write_case, write_case_data wc_data)
701 {
702   while (memory_source_cases != NULL)
703     {
704       memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
705       
706       {
707         struct case_list *current = memory_source_cases;
708         memory_source_cases = memory_source_cases->next;
709         free (current);
710       }
711       
712       if (!write_case (wc_data))
713         return;
714     }
715 }
716
717 /* Writes temp_case to the memory stream. */
718 static void
719 memory_stream_write (void)
720 {
721   struct case_list *new_case = malloc (sizeof (struct case_list)
722                                        + ((compaction_nval - 1)
723                                           * sizeof (union value)));
724
725   /* If we've got memory to spare then add it to the linked list. */
726   if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
727     {
728       if (compaction_necessary)
729         compact_case (&new_case->c, temp_case);
730       else
731         memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
732
733       /* Append case to linked list. */
734       if (memory_sink_cases)
735         memory_sink_iter = memory_sink_iter->next = new_case;
736       else
737         memory_sink_iter = memory_sink_cases = new_case;
738     }
739   else
740     {
741       /* Out of memory.  Write the active file to disk. */
742       struct case_list *cur, *next;
743
744       /* Notify the user. */
745       if (!new_case)
746         msg (MW, _("Virtual memory exhausted.  Paging active file "
747                    "to disk."));
748       else
749         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
750                    "overflowed.  Paging active file to disk."),
751              MAX_WORKSPACE / 1024, memory_sink_max_cases,
752              compaction_nval * sizeof (union value));
753
754       free (new_case);
755
756       /* Switch to a disk sink. */
757       vfm_sink = &vfm_disk_stream;
758       vfm_sink->init ();
759       paging = 1;
760
761       /* Terminate the list. */
762       if (memory_sink_iter)
763         memory_sink_iter->next = NULL;
764
765       /* Write the cases to disk and destroy them.  We can't call
766          vfm->sink->write() because of compaction. */
767       for (cur = memory_sink_cases; cur; cur = next)
768         {
769           next = cur->next;
770           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
771                       disk_sink_file) != 1)
772             {
773               msg (ME, _("An error occurred while attempting to "
774                          "write to a temporary file created as the "
775                          "active file, while paging to disk: %s."),
776                    strerror (errno));
777               err_failure ();
778             }
779           free (cur);
780         }
781
782       /* Write the current case to disk. */
783       vfm_sink->write ();
784     }
785 }
786
787 /* If the data is stored in memory, causes it to be written to disk.
788    To be called only *between* procedure()s, not within them. */
789 void
790 page_to_disk (void)
791 {
792   if (vfm_source == &vfm_memory_stream)
793     {
794       /* Switch to a disk sink. */
795       vfm_sink = &vfm_disk_stream;
796       vfm_sink->init ();
797       paging = 1;
798       
799       /* Write the cases to disk and destroy them.  We can't call
800          vfm->sink->write() because of compaction. */
801       {
802         struct case_list *cur, *next;
803         
804         for (cur = memory_source_cases; cur; cur = next)
805           {
806             next = cur->next;
807             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
808                         disk_sink_file) != 1)
809               {
810                 msg (ME, _("An error occurred while attempting to "
811                            "write to a temporary file created as the "
812                            "active file, while paging to disk: %s."),
813                      strerror (errno));
814                 err_failure ();
815               }
816             free (cur);
817           }
818       }
819       
820       vfm_source = &vfm_disk_stream;
821       vfm_source->mode ();
822
823       vfm_sink = NULL;
824     }
825 }
826
827 /* Switch the memory stream from sink to source mode. */
828 static void
829 memory_stream_mode (void)
830 {
831   /* Terminate the list. */
832   if (memory_sink_iter)
833     memory_sink_iter->next = NULL;
834
835   /* Sink --> source variables. */
836   memory_source_cases = memory_sink_cases;
837   memory_sink_cases = NULL;
838 }
839
840 /* Destroy all memory source data. */
841 static void
842 memory_stream_destroy_source (void)
843 {
844   struct case_list *cur, *next;
845   
846   for (cur = memory_source_cases; cur; cur = next)
847     {
848       next = cur->next;
849       free (cur);
850     }
851   memory_source_cases = NULL;
852 }
853
854 /* Destroy all memory sink data. */
855 static void
856 memory_stream_destroy_sink (void)
857 {
858   struct case_list *cur, *next;
859   
860   for (cur = memory_sink_cases; cur; cur = next)
861     {
862       next = cur->next;
863       free (cur);
864     }
865   memory_sink_cases = NULL;
866 }
867   
868 /* Memory stream. */
869 struct case_stream vfm_memory_stream = 
870   {
871     memory_stream_init,
872     memory_stream_read,
873     memory_stream_write,
874     memory_stream_mode,
875     memory_stream_destroy_source,
876     memory_stream_destroy_sink,
877     "memory",
878   };
879 \f
880 #include "debug-print.h"
881
882 /* Add temp_case to the lag queue. */
883 static void
884 lag_case (void)
885 {
886   if (lag_count < n_lag)
887     lag_count++;
888   memcpy (lag_queue[lag_head], temp_case,
889           dict_get_case_size (temp_dict));
890   if (++lag_head >= n_lag)
891     lag_head = 0;
892 }
893
894 /* Returns a pointer to the lagged case from N_BEFORE cases before the
895    current one, or NULL if there haven't been that many cases yet. */
896 struct ccase *
897 lagged_case (int n_before)
898 {
899   assert (n_before <= n_lag);
900   if (n_before > lag_count)
901     return NULL;
902   
903   {
904     int index = lag_head - n_before;
905     if (index < 0)
906       index += n_lag;
907     return lag_queue[index];
908   }
909 }
910    
911 /* Transforms temp_case and writes it to the replacement active file
912    if advisable.  Returns nonzero if more cases can be accepted, zero
913    otherwise.  Do not call this function again after it has returned
914    zero once.  */
915 int
916 procedure_write_case (write_case_data wc_data)
917 {
918   /* Index of current transformation. */
919   int cur_trns;
920
921   /* Return value: whether it's reasonable to write any more cases. */
922   int more_cases = 1;
923
924   debug_printf ((_("transform: ")));
925
926   cur_trns = f_trns;
927   for (;;)
928     {
929       /* Output the case if this is temp_trns. */
930       if (cur_trns == temp_trns)
931         {
932           debug_printf (("REC"));
933
934           if (n_lag)
935             lag_case ();
936           
937           vfm_sink_info.ncases++;
938           vfm_sink->write ();
939
940           if (dict_get_case_limit (default_dict))
941             more_cases = (vfm_sink_info.ncases
942                           < dict_get_case_limit (default_dict));
943         }
944
945       /* Are we done? */
946       if (cur_trns >= n_trns)
947         break;
948       
949       debug_printf (("$%d", cur_trns));
950
951       /* Decide which transformation should come next. */
952       {
953         int code;
954         
955         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
956         switch (code)
957           {
958           case -1:
959             /* Next transformation. */
960             cur_trns++;
961             break;
962           case -2:
963             /* Delete this case. */
964             goto done;
965           default:
966             /* Go to that transformation. */
967             cur_trns = code;
968             break;
969           }
970       }
971     }
972
973   /* Call the beginning of group function. */
974   if (!case_count && wc_data->beginfunc != NULL)
975     wc_data->beginfunc (wc_data->aux);
976
977   /* Call the procedure if there is one and FILTER and PROCESS IF
978      don't prohibit it. */
979   if (wc_data->procfunc != NULL && !exclude_this_case ())
980     wc_data->procfunc (temp_case, wc_data->aux);
981
982   case_count++;
983   
984 done:
985   debug_putc ('\n', stdout);
986
987   clear_temp_case ();
988   
989   /* Return previously determined value. */
990   return more_cases;
991 }
992
993 /* Clears the variables in the temporary case that need to be
994    cleared between processing cases.  */
995 static void
996 clear_temp_case (void)
997 {
998   /* FIXME?  This is linear in the number of variables, but
999      doesn't need to be, so it's an easy optimization target. */
1000   size_t var_cnt = dict_get_var_cnt (default_dict);
1001   size_t i;
1002   
1003   for (i = 0; i < var_cnt; i++) 
1004     {
1005       struct variable *v = dict_get_var (default_dict, i);
1006       if (v->init && v->reinit) 
1007         {
1008           if (v->type == NUMERIC) 
1009             temp_case->data[v->fv].f = SYSMIS;
1010           else
1011             memset (temp_case->data[v->fv].s, ' ', v->width);
1012         } 
1013     }
1014 }
1015
1016 /* Returns nonzero if this case should be exclude as specified on
1017    FILTER or PROCESS IF, otherwise zero. */
1018 static int
1019 exclude_this_case (void)
1020 {
1021   /* FILTER. */
1022   struct variable *filter_var = dict_get_filter (default_dict);
1023   if (filter_var != NULL) 
1024     {
1025       double f = temp_case->data[filter_var->fv].f;
1026       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
1027         return 1;
1028     }
1029
1030   /* PROCESS IF. */
1031   if (process_if_expr != NULL
1032       && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0)
1033     return 1;
1034
1035   return 0;
1036 }
1037
1038 /* Appends TRNS to t_trns[], the list of all transformations to be
1039    performed on data as it is read from the active file. */
1040 void
1041 add_transformation (struct trns_header * trns)
1042 {
1043   if (n_trns >= m_trns)
1044     {
1045       m_trns += 16;
1046       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1047     }
1048   t_trns[n_trns] = trns;
1049   trns->index = n_trns++;
1050 }
1051
1052 /* Cancels all active transformations, including any transformations
1053    created by the input program. */
1054 void
1055 cancel_transformations (void)
1056 {
1057   int i;
1058   for (i = 0; i < n_trns; i++)
1059     {
1060       if (t_trns[i]->free)
1061         t_trns[i]->free (t_trns[i]);
1062       free (t_trns[i]);
1063     }
1064   n_trns = f_trns = 0;
1065   if (m_trns > 32)
1066     {
1067       free (t_trns);
1068       m_trns = 0;
1069     }
1070 }
1071
1072 /* Dumps out the values of all the split variables for the case C. */
1073 static void
1074 dump_splits (struct ccase *c)
1075 {
1076   struct variable *const *split;
1077   struct tab_table *t;
1078   size_t split_cnt;
1079   int i;
1080
1081   split_cnt = dict_get_split_cnt (default_dict);
1082   t = tab_create (3, split_cnt + 1, 0);
1083   tab_dim (t, tab_natural_dimensions);
1084   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1085   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1086   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1087   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1088   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1089   split = dict_get_split_vars (default_dict);
1090   for (i = 0; i < split_cnt; i++)
1091     {
1092       struct variable *v = split[i];
1093       char temp_buf[80];
1094       const char *val_lab;
1095
1096       assert (v->type == NUMERIC || v->type == ALPHA);
1097       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1098       
1099       data_out (temp_buf, &v->print, &c->data[v->fv]);
1100       
1101       temp_buf[v->print.w] = 0;
1102       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1103
1104       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1105       if (val_lab)
1106         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1107     }
1108   tab_flags (t, SOMF_NO_TITLE);
1109   tab_submit (t);
1110 }
1111
1112 /* This procfunc is substituted for the user-supplied procfunc when
1113    SPLIT FILE is active.  This function forms a wrapper around that
1114    procfunc by dividing the input into series. */
1115 static int
1116 SPLIT_FILE_procfunc (struct ccase *c, void *data_)
1117 {
1118   struct write_case_data *data = data_;
1119   static struct ccase *prev_case;
1120   struct variable *const *split;
1121   size_t split_cnt;
1122   size_t i;
1123
1124   /* The first case always begins a new series.  We also need to
1125      preserve the values of the case for later comparison. */
1126   if (case_count == 0)
1127     {
1128       if (prev_case)
1129         free (prev_case);
1130       prev_case = xmalloc (vfm_sink_info.case_size);
1131       memcpy (prev_case, c, vfm_sink_info.case_size);
1132
1133       dump_splits (c);
1134       if (data->beginfunc != NULL)
1135         data->beginfunc (data->aux);
1136       
1137       return data->procfunc (c, data->aux);
1138     }
1139
1140   /* Compare the value of each SPLIT FILE variable to the values on
1141      the previous case. */
1142   split = dict_get_split_vars (default_dict);
1143   split_cnt = dict_get_split_cnt (default_dict);
1144   for (i = 0; i < split_cnt; i++)
1145     {
1146       struct variable *v = split[i];
1147       
1148       switch (v->type)
1149         {
1150         case NUMERIC:
1151           if (c->data[v->fv].f != prev_case->data[v->fv].f)
1152             goto not_equal;
1153           break;
1154         case ALPHA:
1155           if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
1156             goto not_equal;
1157           break;
1158         default:
1159           assert (0);
1160         }
1161     }
1162   return data->procfunc (c, data->aux);
1163   
1164 not_equal:
1165   /* The values of the SPLIT FILE variable are different from the
1166      values on the previous case.  That means that it's time to begin
1167      a new series. */
1168   if (data->endfunc != NULL)
1169     data->endfunc (data->aux);
1170   dump_splits (c);
1171   if (data->beginfunc != NULL)
1172     data->beginfunc (data->aux);
1173   memcpy (prev_case, c, vfm_sink_info.case_size);
1174   return data->procfunc (c, data->aux);
1175 }
1176 \f
1177 /* Case compaction. */
1178
1179 /* Copies case SRC to case DEST, compacting it in the process. */
1180 void
1181 compact_case (struct ccase *dest, const struct ccase *src)
1182 {
1183   int i;
1184   int nval = 0;
1185   size_t var_cnt;
1186   
1187   assert (compaction_necessary);
1188
1189   if (temporary == 2)
1190     {
1191       if (dest != compaction_case)
1192         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1193       return;
1194     }
1195
1196   /* Copy all the variables except the scratch variables from SRC to
1197      DEST. */
1198   var_cnt = dict_get_var_cnt (default_dict);
1199   for (i = 0; i < var_cnt; i++)
1200     {
1201       struct variable *v = dict_get_var (default_dict, i);
1202       
1203       if (v->name[0] == '#')
1204         continue;
1205
1206       if (v->type == NUMERIC)
1207         dest->data[nval++] = src->data[v->fv];
1208       else
1209         {
1210           int w = DIV_RND_UP (v->width, sizeof (union value));
1211           
1212           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1213           nval += w;
1214         }
1215     }
1216 }
1217
1218 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1219 static void
1220 finish_compaction (void)
1221 {
1222   int i;
1223
1224   for (i = 0; i < dict_get_var_cnt (default_dict); )
1225     {
1226       struct variable *v = dict_get_var (default_dict, i);
1227
1228       if (v->name[0] == '#') 
1229         dict_delete_var (default_dict, v);
1230       else
1231         i++;
1232     }
1233   dict_compact_values (default_dict);
1234 }
1235
1236