a847ea65f25d5285e714eb881eab9f82faa2711c
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "do-ifP.h"
32 #include "error.h"
33 #include "expr.h"
34 #include "misc.h"
35 #include "random.h"
36 #include "settings.h"
37 #include "som.h"
38 #include "str.h"
39 #include "tab.h"
40 #include "var.h"
41 #include "value-labels.h"
42
43 /*
44    Virtual File Manager (vfm):
45
46    vfm is used to process data files.  It uses the model that
47    data is read from one stream (the data source), processed,
48    then written to another (the data sink).  The data source is
49    then deleted and the data sink becomes the data source for the
50    next procedure. */
51
52 /* Procedure execution data. */
53 struct write_case_data
54   {
55     /* Functions to call... */
56     void (*begin_func) (void *);               /* ...before data. */
57     int (*proc_func) (struct ccase *, void *); /* ...with data. */
58     void (*end_func) (void *);                 /* ...after data. */
59     void *func_aux;                            /* Auxiliary data. */ 
60
61     /* Extra auxiliary data. */
62     void *aux;
63   };
64
65 /* The current active file, from which cases are read. */
66 struct case_source *vfm_source;
67
68 /* The replacement active file, to which cases are written. */
69 struct case_sink *vfm_sink;
70
71 /* Nonzero if the case needs to have values deleted before being
72    stored, zero otherwise. */
73 int compaction_necessary;
74
75 /* Number of values after compaction. */
76 int compaction_nval;
77
78 /* Temporary case buffer with enough room for `compaction_nval'
79    `value's. */
80 struct ccase *compaction_case;
81
82 /* Nonzero means that we've overflowed our allotted workspace.
83    After that happens once during a session, we always store the
84    active file on disk instead of in memory.  (This policy may be
85    too aggressive.) */
86 static int workspace_overflow = 0;
87
88 /* Time at which vfm was last invoked. */
89 time_t last_vfm_invocation;
90
91 /* Number of cases passed to proc_func(). */
92 static int case_count;
93
94 /* Lag queue. */
95 int n_lag;                      /* Number of cases to lag. */
96 static int lag_count;           /* Number of cases in lag_queue so far. */
97 static int lag_head;            /* Index where next case will be added. */
98 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
99
100 static void open_active_file (void);
101 static void close_active_file (struct write_case_data *);
102 static int SPLIT_FILE_proc_func (struct ccase *, void *);
103 static void finish_compaction (void);
104 static void lag_case (void);
105 static int procedure_write_case (struct write_case_data *);
106 static void clear_temp_case (void);
107 static int exclude_this_case (int case_num);
108 \f
109 /* Public functions. */
110
111 struct procedure_aux_data 
112   {
113     size_t cases_written;       /* Number of cases written so far. */
114   };
115
116 struct split_aux_data 
117   {
118     struct ccase *prev_case;    /* Data in previous case. */
119   };
120
121 /* Reads all the cases from the active file, transforms them by
122    the active set of transformations, calls PROC_FUNC with CURCASE
123    set to the case, and writes them to a new active file.
124
125    Divides the active file into zero or more series of one or more
126    cases each.  BEGIN_FUNC is called before each series.  END_FUNC is
127    called after each series.
128
129    Arbitrary user-specified data AUX is passed to BEGIN_FUNC,
130    PROC_FUNC, and END_FUNC as auxiliary data. */
131 void
132 procedure (void (*begin_func) (void *),
133            int (*proc_func) (struct ccase *curcase, void *),
134            void (*end_func) (void *),
135            void *func_aux)
136 {
137   static int recursive_call;
138
139   struct write_case_data procedure_write_data;
140   struct procedure_aux_data proc_aux;
141
142   struct write_case_data split_file_data;
143   struct split_aux_data split_aux;
144   int split;
145
146   assert (++recursive_call == 1);
147
148   proc_aux.cases_written = 0;
149
150   /* Normally we just use the data passed by the user. */
151   procedure_write_data.begin_func = begin_func;
152   procedure_write_data.proc_func = proc_func;
153   procedure_write_data.end_func = end_func;
154   procedure_write_data.func_aux = func_aux;
155   procedure_write_data.aux = &proc_aux;
156
157   /* Under SPLIT FILE, we add a layer of indirection. */
158   split = dict_get_split_cnt (default_dict) > 0;
159   if (split) 
160     {
161       split_file_data = procedure_write_data;
162       split_file_data.aux = &split_aux;
163
164       split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
165
166       procedure_write_data.begin_func = NULL;
167       procedure_write_data.proc_func = SPLIT_FILE_proc_func;
168       procedure_write_data.end_func = end_func;
169       procedure_write_data.func_aux = &split_file_data;
170     }
171
172   last_vfm_invocation = time (NULL);
173
174   open_active_file ();
175   if (vfm_source != NULL) 
176     vfm_source->class->read (vfm_source,
177                              procedure_write_case, &procedure_write_data);
178   close_active_file (&procedure_write_data);
179
180   if (split)
181     free (split_aux.prev_case);
182
183   assert (--recursive_call == 0);
184 }
185 \f
186 /* Active file processing support.  Subtly different semantics from
187    procedure(). */
188
189 static int process_active_file_write_case (struct write_case_data *data);
190
191 /* The casefunc might want us to stop calling it. */
192 static int not_canceled;
193
194 /* Reads all the cases from the active file and passes them one-by-one
195    to CASEFUNC in temp_case.  Before any cases are passed, calls
196    BEGIN_FUNC.  After all the cases have been passed, calls END_FUNC.
197    BEGIN_FUNC, CASEFUNC, and END_FUNC can write temp_case to the output
198    file by calling process_active_file_output_case().
199
200    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
201 void
202 process_active_file (void (*begin_func) (void *),
203                      int (*casefunc) (struct ccase *curcase, void *),
204                      void (*end_func) (void *),
205                      void *func_aux)
206 {
207   struct write_case_data process_active_write_data;
208
209   process_active_write_data.begin_func = begin_func;
210   process_active_write_data.proc_func = casefunc;
211   process_active_write_data.end_func = end_func;
212   process_active_write_data.func_aux = func_aux;
213
214   not_canceled = 1;
215
216   open_active_file ();
217   begin_func (func_aux);
218   if (vfm_source != NULL)
219     vfm_source->class->read (vfm_source, process_active_file_write_case,
220                              &process_active_write_data);
221   end_func (func_aux);
222   close_active_file (&process_active_write_data);
223 }
224
225 /* Pass the current case to casefunc. */
226 static int
227 process_active_file_write_case (struct write_case_data *data)
228 {
229   /* Index of current transformation. */
230   int cur_trns;
231
232   for (cur_trns = f_trns ; cur_trns != temp_trns; )
233     {
234       int code;
235         
236       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case,
237                                      case_count + 1);
238       switch (code)
239         {
240         case -1:
241           /* Next transformation. */
242           cur_trns++;
243           break;
244         case -2:
245           /* Delete this case. */
246           goto done;
247         default:
248           /* Go to that transformation. */
249           cur_trns = code;
250           break;
251         }
252     }
253
254   if (n_lag)
255     lag_case ();
256           
257   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
258   if (not_canceled && !exclude_this_case (case_count + 1))
259     not_canceled = data->proc_func (temp_case, data->func_aux);
260   
261   case_count++;
262   
263  done:
264   clear_temp_case ();
265
266   return 1;
267 }
268
269 /* Write temp_case to the active file. */
270 void
271 process_active_file_output_case (void)
272 {
273   vfm_sink->class->write (vfm_sink, temp_case);
274 }
275 \f
276 /* Opening the active file. */
277
278 /* It might be usefully noted that the following several functions are
279    given in the order that they are called by open_active_file(). */
280
281 /* Prepare to write to the replacement active file. */
282 static void
283 prepare_for_writing (void)
284 {
285   /* FIXME: If ALL the conditions listed below hold true, then the
286      replacement active file is guaranteed to be identical to the
287      original active file:
288
289      1. TEMPORARY was the first transformation, OR, there were no
290      transformations at all.
291
292      2. Input is not coming from an input program.
293
294      3. Compaction is not necessary.
295
296      So, in this case, we shouldn't have to replace the active
297      file--it's just a waste of time and space. */
298
299   if (vfm_sink == NULL)
300     {
301       if (workspace_overflow)
302         vfm_sink = create_case_sink (&disk_sink_class, NULL);
303       else
304         vfm_sink = create_case_sink (&memory_sink_class, NULL);
305     }
306 }
307
308 /* Arrange for compacting the output cases for storage. */
309 static void
310 arrange_compaction (void)
311 {
312   int count_values = 0;
313
314   {
315     int i;
316     
317     /* Count up the number of `value's that will be output. */
318     for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
319       {
320         struct variable *v = dict_get_var (temp_dict, i);
321
322         if (v->name[0] != '#')
323           {
324             assert (v->nv > 0);
325             count_values += v->nv;
326           } 
327       }
328     assert (temporary == 2
329             || count_values <= dict_get_next_value_idx (temp_dict));
330   }
331   
332   /* Compaction is only necessary if the number of `value's to output
333      differs from the number already present. */
334   compaction_nval = count_values;
335   if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
336     compaction_necessary = 1;
337   else
338     compaction_necessary = 0;
339   
340   if (vfm_sink->class->open != NULL)
341     vfm_sink->class->open (vfm_sink);
342 }
343
344 /* Prepares the temporary case and compaction case. */
345 static void
346 make_temp_case (void)
347 {
348   temp_case = xmalloc (dict_get_case_size (default_dict));
349
350   if (compaction_necessary)
351     compaction_case = xmalloc (sizeof (struct ccase)
352                                + sizeof (union value) * (compaction_nval - 1));
353 }
354
355 #if DEBUGGING
356 /* Returns the name of the variable that owns the index CCASE_INDEX
357    into ccase. */
358 static const char *
359 index_to_varname (int ccase_index)
360 {
361   int i;
362
363   for (i = 0; i < default_dict.nvar; i++)
364     {
365       struct variable *v = default_dict.var[i];
366       
367       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
368         return default_dict.var[i]->name;
369     }
370   return _("<NOVAR>");
371 }
372 #endif
373
374 /* Initializes temp_case from the vectors that say which `value's
375    need to be initialized just once, and which ones need to be
376    re-initialized before every case. */
377 static void
378 vector_initialization (void)
379 {
380   size_t var_cnt = dict_get_var_cnt (default_dict);
381   size_t i;
382   
383   for (i = 0; i < var_cnt; i++) 
384     {
385       struct variable *v = dict_get_var (default_dict, i);
386
387       if (v->type == NUMERIC) 
388         {
389           if (v->reinit)
390             temp_case->data[v->fv].f = 0.0;
391           else
392             temp_case->data[v->fv].f = SYSMIS;
393         }
394       else
395         memset (temp_case->data[v->fv].s, ' ', v->width);
396     }
397 }
398
399 /* Sets all the lag-related variables based on value of n_lag. */
400 static void
401 setup_lag (void)
402 {
403   int i;
404   
405   if (n_lag == 0)
406     return;
407
408   lag_count = 0;
409   lag_head = 0;
410   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
411   for (i = 0; i < n_lag; i++)
412     lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
413 }
414
415 /* There is a lot of potential confusion in the vfm and related
416    routines over the number of `value's at each stage of the process.
417    Here is each nval count, with explanation, as set up by
418    open_active_file():
419
420    temp_dict->nval: Number of `value's in the cases after the
421    transformations leading up to TEMPORARY have been performed.
422
423    compaction_nval: Number of `value's in the cases after the
424    transformations leading up to TEMPORARY have been performed
425    and the case has been compacted by compact_case(), if
426    compaction is necessary.  This the number of `value's in the
427    cases saved by the sink stream.  (However, note that the cases
428    passed to the sink stream have not yet been compacted.  It is
429    the responsibility of the data sink to call compact_case().)
430    `compaction' becomes the new value of default_dict.nval after
431    the procedure is completed.
432
433    default_dict.nval: This is often an alias for temp_dict->nval.
434    As such it can really have no separate existence until the
435    procedure is complete.  For this reason it should *not* be
436    referenced inside the execution of a procedure. */
437 /* Makes all preparations for reading from the data source and writing
438    to the data sink. */
439 static void
440 open_active_file (void)
441 {
442   /* Sometimes we want to refer to the dictionary that applies to the
443      data actually written to the sink.  This is either temp_dict or
444      default_dict.  However, if TEMPORARY is not on, then temp_dict
445      does not apply.  So, we can set temp_dict to default_dict in this
446      case. */
447   if (!temporary)
448     {
449       temp_trns = n_trns;
450       temp_dict = default_dict;
451     }
452
453   /* No cases passed to the procedure yet. */
454   case_count = 0;
455
456   /* The rest. */
457   prepare_for_writing ();
458   arrange_compaction ();
459   make_temp_case ();
460   vector_initialization ();
461   discard_ctl_stack ();
462   setup_lag ();
463 }
464 \f
465 /* Closes the active file. */
466 static void
467 close_active_file (struct write_case_data *data)
468 {
469   /* Close the current case group. */
470   if (case_count && data->end_func != NULL)
471     data->end_func (data->func_aux);
472
473   /* Stop lagging (catch up?). */
474   if (n_lag)
475     {
476       int i;
477       
478       for (i = 0; i < n_lag; i++)
479         free (lag_queue[i]);
480       free (lag_queue);
481       n_lag = 0;
482     }
483   
484   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
485      off TEMPORARY. */
486   if (temporary)
487     {
488       dict_destroy (default_dict);
489       default_dict = temp_dict;
490       temp_dict = NULL;
491     }
492
493   /* Finish compaction. */
494   if (compaction_necessary)
495     finish_compaction ();
496     
497   /* Old data sink --> New data source. */
498   if (vfm_source != NULL) 
499     {
500       if (vfm_source->class->destroy != NULL)
501         vfm_source->class->destroy (vfm_source);
502       free (vfm_source);
503     }
504
505   vfm_source = vfm_sink->class->make_source (vfm_sink);
506
507   /* Old data sink is gone now. */
508   free (vfm_sink);
509   vfm_sink = NULL;
510
511   /* Cancel TEMPORARY. */
512   cancel_temporary ();
513
514   /* Free temporary cases. */
515   free (temp_case);
516   temp_case = NULL;
517
518   free (compaction_case);
519   compaction_case = NULL;
520
521   /* Cancel PROCESS IF. */
522   expr_free (process_if_expr);
523   process_if_expr = NULL;
524
525   /* Cancel FILTER if temporary. */
526   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
527     dict_set_filter (default_dict, NULL);
528
529   /* Cancel transformations. */
530   cancel_transformations ();
531
532   /* Turn off case limiter. */
533   dict_set_case_limit (default_dict, 0);
534
535   /* Clear VECTOR vectors. */
536   dict_clear_vectors (default_dict);
537 }
538 \f
539 /* Disk case stream. */
540
541 /* Information about disk sink or source. */
542 struct disk_stream_info 
543   {
544     FILE *file;                 /* Output file. */
545     size_t case_cnt;            /* Number of cases written so far. */
546     size_t case_size;           /* Number of bytes in case. */
547   };
548
549 /* Initializes the disk sink. */
550 static void
551 disk_sink_create (struct case_sink *sink)
552 {
553   struct disk_stream_info *info = xmalloc (sizeof *info);
554   info->file = tmpfile ();
555   info->case_cnt = 0;
556   info->case_size = compaction_nval;
557   sink->aux = info;
558   if (info->file == NULL)
559     {
560       msg (ME, _("An error occurred attempting to create a temporary "
561                  "file for use as the active file: %s."),
562            strerror (errno));
563       err_failure ();
564     }
565 }
566
567 /* Writes temp_case to the disk sink. */
568 static void
569 disk_sink_write (struct case_sink *sink, struct ccase *c)
570 {
571   struct disk_stream_info *info = sink->aux;
572   union value *src_case;
573
574   if (compaction_necessary)
575     {
576       compact_case (compaction_case, c);
577       src_case = compaction_case->data;
578     }
579   else src_case = c->data;
580
581   info->case_cnt++;
582   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
583               info->file) != 1)
584     {
585       msg (ME, _("An error occurred while attempting to write to a "
586                  "temporary file used as the active file: %s."),
587            strerror (errno));
588       err_failure ();
589     }
590 }
591
592 /* Destroys the sink's internal data. */
593 static void
594 disk_sink_destroy (struct case_sink *sink)
595 {
596   struct disk_stream_info *info = sink->aux;
597   if (info->file != NULL)
598     fclose (info->file);
599 }
600
601 /* Closes and destroys the sink and returns a disk source to read
602    back the written data. */
603 static struct case_source *
604 disk_sink_make_source (struct case_sink *sink) 
605 {
606   struct disk_stream_info *info = sink->aux;
607     
608   /* Rewind the file. */
609   assert (info->file != NULL);
610   if (fseek (info->file, 0, SEEK_SET) != 0)
611     {
612       msg (ME, _("An error occurred while attempting to rewind a "
613                  "temporary file used as the active file: %s."),
614            strerror (errno));
615       err_failure ();
616     }
617   
618   return create_case_source (&disk_source_class, info);
619 }
620
621 /* Disk sink. */
622 const struct case_sink_class disk_sink_class = 
623   {
624     "disk",
625     disk_sink_create,
626     disk_sink_write,
627     disk_sink_destroy,
628     disk_sink_make_source,
629   };
630 \f
631 /* Disk source. */
632
633 /* Returns the number of cases that will be read by
634    disk_source_read(). */
635 static int
636 disk_source_count (const struct case_source *source) 
637 {
638   struct disk_stream_info *info = source->aux;
639
640   return info->case_cnt;
641 }
642
643 /* Reads all cases from the disk source and passes them one by one to
644    write_case(). */
645 static void
646 disk_source_read (struct case_source *source,
647                   write_case_func *write_case, write_case_data wc_data)
648 {
649   struct disk_stream_info *info = source->aux;
650   int i;
651
652   for (i = 0; i < info->case_cnt; i++)
653     {
654       if (!fread (temp_case, info->case_size, 1, info->file))
655         {
656           msg (ME, _("An error occurred while attempting to read from "
657                "a temporary file created for the active file: %s."),
658                strerror (errno));
659           err_failure ();
660           return;
661         }
662
663       if (!write_case (wc_data))
664         return;
665     }
666 }
667
668 /* Destroys the source's internal data. */
669 static void
670 disk_source_destroy (struct case_source *source)
671 {
672   struct disk_stream_info *info = source->aux;
673   if (info->file != NULL)
674     fclose (info->file);
675   free (info);
676 }
677
678 /* Disk source. */
679 const struct case_source_class disk_source_class = 
680   {
681     "disk",
682     disk_source_count,
683     disk_source_read,
684     disk_source_destroy,
685   };
686 \f
687 /* Memory case stream. */
688
689 /* Memory sink data. */
690 struct memory_sink_info
691   {
692     size_t case_cnt;            /* Number of cases. */
693     size_t case_size;           /* Case size in bytes. */
694     int max_cases;              /* Maximum cases before switching to disk. */
695     struct case_list *head;     /* First case in list. */
696     struct case_list *tail;     /* Last case in list. */
697   };
698
699 /* Memory source data. */
700 struct memory_source_info 
701   {
702     size_t case_cnt;            /* Number of cases. */
703     size_t case_size;           /* Case size in bytes. */
704     struct case_list *cases;    /* List of cases. */
705   };
706
707 static void
708 memory_sink_create (struct case_sink *sink) 
709 {
710   struct memory_sink_info *info;
711   
712   sink->aux = info = xmalloc (sizeof *info);
713
714   assert (compaction_nval > 0);
715   info->case_cnt = 0;
716   info->case_size = compaction_nval * sizeof (union value);
717   info->max_cases = set_max_workspace / info->case_size;
718   info->head = info->tail = NULL;
719 }
720
721 static void
722 memory_sink_write (struct case_sink *sink, struct ccase *c) 
723 {
724   struct memory_sink_info *info = sink->aux;
725   size_t case_size;
726   struct case_list *new_case;
727
728   case_size = sizeof (struct case_list)
729                       + ((compaction_nval - 1) * sizeof (union value));
730   new_case = malloc (case_size);
731
732   /* If we've got memory to spare then add it to the linked list. */
733   if (info->case_cnt <= info->max_cases && new_case != NULL)
734     {
735       info->case_cnt++;
736
737       /* Append case to linked list. */
738       new_case->next = NULL;
739       if (info->head != NULL)
740         info->tail->next = new_case;
741       else
742         info->head = new_case;
743       info->tail = new_case;
744
745       /* Copy data into case. */
746       if (compaction_necessary)
747         compact_case (&new_case->c, c);
748       else
749         memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
750     }
751   else
752     {
753       /* Out of memory.  Write the active file to disk. */
754       struct case_list *cur, *next;
755
756       /* Notify the user. */
757       if (!new_case)
758         msg (MW, _("Virtual memory exhausted.  Writing active file "
759                    "to disk."));
760       else
761         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
762                    "overflowed.  Writing active file to disk."),
763              set_max_workspace / 1024, info->max_cases,
764              compaction_nval * sizeof (union value));
765
766       free (new_case);
767
768       /* Switch to a disk sink. */
769       vfm_sink = create_case_sink (&disk_sink_class, NULL);
770       vfm_sink->class->open (vfm_sink);
771       workspace_overflow = 1;
772
773       /* Write the cases to disk and destroy them.  We can't call
774          vfm->sink->write() because of compaction. */
775       for (cur = info->head; cur; cur = next)
776         {
777           next = cur->next;
778           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
779                       vfm_sink->aux) != 1)
780             {
781               msg (ME, _("An error occurred while attempting to "
782                          "write to a temporary file created as the "
783                          "active file: %s."),
784                    strerror (errno));
785               err_failure ();
786             }
787           free (cur);
788         }
789
790       /* Write the current case to disk. */
791       vfm_sink->class->write (vfm_sink, c);
792     }
793 }
794
795 /* If the data is stored in memory, causes it to be written to disk.
796    To be called only *between* procedure()s, not within them. */
797 void
798 write_active_file_to_disk (void)
799 {
800   if (case_source_is_class (vfm_source, &memory_source_class))
801     {
802       struct memory_source_info *info = vfm_source->aux;
803
804       /* Switch to a disk sink. */
805       vfm_sink = create_case_sink (&disk_sink_class, NULL);
806       vfm_sink->class->open (vfm_sink);
807       workspace_overflow = 1;
808       
809       /* Write the cases to disk and destroy them.  We can't call
810          vfm->sink->write() because of compaction. */
811       {
812         struct case_list *cur, *next;
813         
814         for (cur = info->cases; cur; cur = next)
815           {
816             next = cur->next;
817             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
818                         vfm_sink->aux) != 1)
819               {
820                 msg (ME, _("An error occurred while attempting to "
821                            "write to a temporary file created as the "
822                            "active file: %s."),
823                      strerror (errno));
824                 err_failure ();
825               }
826             free (cur);
827           }
828       }
829       
830       vfm_source = vfm_sink->class->make_source (vfm_sink);
831       vfm_sink = NULL;
832     }
833 }
834
835 /* Destroy all memory sink data. */
836 static void
837 memory_sink_destroy (struct case_sink *sink)
838 {
839   struct memory_sink_info *info = sink->aux;
840   struct case_list *cur, *next;
841   
842   for (cur = info->head; cur; cur = next)
843     {
844       next = cur->next;
845       free (cur);
846     }
847   free (info);
848 }
849
850 /* Switch the memory stream from sink to source mode. */
851 static struct case_source *
852 memory_sink_make_source (struct case_sink *sink)
853 {
854   struct memory_sink_info *sink_info = sink->aux;
855   struct memory_source_info *source_info;
856
857   source_info = xmalloc (sizeof *source_info);
858   source_info->case_cnt = sink_info->case_cnt;
859   source_info->case_size = sink_info->case_size;
860   source_info->cases = sink_info->head;
861
862   free (sink_info);
863
864   return create_case_source (&memory_source_class, source_info);
865 }
866
867 const struct case_sink_class memory_sink_class = 
868   {
869     "memory",
870     memory_sink_create,
871     memory_sink_write,
872     memory_sink_destroy,
873     memory_sink_make_source,
874   };
875
876 /* Returns the number of cases in the source. */
877 static int
878 memory_source_count (const struct case_source *source) 
879 {
880   struct memory_source_info *info = source->aux;
881
882   return info->case_cnt;
883 }
884
885 /* Reads the case stream from memory and passes it to write_case(). */
886 static void
887 memory_source_read (struct case_source *source,
888                     write_case_func *write_case, write_case_data wc_data)
889 {
890   struct memory_source_info *info = source->aux;
891
892   while (info->cases != NULL) 
893     {
894       struct case_list *iter = info->cases;
895       info->cases = iter->next;
896       memcpy (temp_case, &iter->c, info->case_size);
897       free (iter);
898       
899       if (!write_case (wc_data))
900         return;
901     }
902 }
903
904 /* Destroy all memory source data. */
905 static void
906 memory_source_destroy (struct case_source *source)
907 {
908   struct memory_source_info *info = source->aux;
909   struct case_list *cur, *next;
910   
911   for (cur = info->cases; cur; cur = next)
912     {
913       next = cur->next;
914       free (cur);
915     }
916   free (info);
917 }
918
919 struct case_list *
920 memory_source_get_cases (const struct case_source *source) 
921 {
922   struct memory_source_info *info = source->aux;
923
924   return info->cases;
925 }
926
927 void
928 memory_source_set_cases (const struct case_source *source,
929                          struct case_list *cases) 
930 {
931   struct memory_source_info *info = source->aux;
932
933   info->cases = cases;
934 }
935
936 /* Memory stream. */
937 const struct case_source_class memory_source_class = 
938   {
939     "memory",
940     memory_source_count,
941     memory_source_read,
942     memory_source_destroy,
943   };
944 \f
945 /* Add temp_case to the lag queue. */
946 static void
947 lag_case (void)
948 {
949   if (lag_count < n_lag)
950     lag_count++;
951   memcpy (lag_queue[lag_head], temp_case,
952           dict_get_case_size (temp_dict));
953   if (++lag_head >= n_lag)
954     lag_head = 0;
955 }
956
957 /* Returns a pointer to the lagged case from N_BEFORE cases before the
958    current one, or NULL if there haven't been that many cases yet. */
959 struct ccase *
960 lagged_case (int n_before)
961 {
962   assert (n_before <= n_lag);
963   if (n_before > lag_count)
964     return NULL;
965   
966   {
967     int index = lag_head - n_before;
968     if (index < 0)
969       index += n_lag;
970     return lag_queue[index];
971   }
972 }
973    
974 /* Transforms temp_case and writes it to the replacement active file
975    if advisable.  Returns nonzero if more cases can be accepted, zero
976    otherwise.  Do not call this function again after it has returned
977    zero once.  */
978 int
979 procedure_write_case (write_case_data wc_data)
980 {
981   struct procedure_aux_data *proc_aux = wc_data->aux;
982
983   /* Index of current transformation. */
984   int cur_trns;
985
986   /* Return value: whether it's reasonable to write any more cases. */
987   int more_cases = 1;
988
989   cur_trns = f_trns;
990   for (;;)
991     {
992       /* Output the case if this is temp_trns. */
993       if (cur_trns == temp_trns)
994         {
995           int case_limit;
996
997           if (n_lag)
998             lag_case ();
999           
1000           vfm_sink->class->write (vfm_sink, temp_case);
1001
1002           proc_aux->cases_written++;
1003           case_limit = dict_get_case_limit (default_dict);
1004           if (case_limit != 0 && proc_aux->cases_written >= case_limit)
1005             more_cases = 0;
1006         }
1007
1008       /* Are we done? */
1009       if (cur_trns >= n_trns)
1010         break;
1011       
1012       /* Decide which transformation should come next. */
1013       {
1014         int code;
1015         
1016         code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case,
1017                                        proc_aux->cases_written + 1);
1018         switch (code)
1019           {
1020           case -1:
1021             /* Next transformation. */
1022             cur_trns++;
1023             break;
1024           case -2:
1025             /* Delete this case. */
1026             goto done;
1027           default:
1028             /* Go to that transformation. */
1029             cur_trns = code;
1030             break;
1031           }
1032       }
1033     }
1034
1035   /* Call the beginning of group function. */
1036   if (!case_count && wc_data->begin_func != NULL)
1037     wc_data->begin_func (wc_data->func_aux);
1038
1039   /* Call the procedure if there is one and FILTER and PROCESS IF
1040      don't prohibit it. */
1041   if (wc_data->proc_func != NULL
1042       && !exclude_this_case (proc_aux->cases_written + 1))
1043     wc_data->proc_func (temp_case, wc_data->func_aux);
1044
1045   case_count++;
1046   
1047 done:
1048   clear_temp_case ();
1049   
1050   /* Return previously determined value. */
1051   return more_cases;
1052 }
1053
1054 /* Clears the variables in the temporary case that need to be
1055    cleared between processing cases.  */
1056 static void
1057 clear_temp_case (void)
1058 {
1059   /* FIXME?  This is linear in the number of variables, but
1060      doesn't need to be, so it's an easy optimization target. */
1061   size_t var_cnt = dict_get_var_cnt (default_dict);
1062   size_t i;
1063   
1064   for (i = 0; i < var_cnt; i++) 
1065     {
1066       struct variable *v = dict_get_var (default_dict, i);
1067       if (v->init && v->reinit) 
1068         {
1069           if (v->type == NUMERIC) 
1070             temp_case->data[v->fv].f = SYSMIS;
1071           else
1072             memset (temp_case->data[v->fv].s, ' ', v->width);
1073         } 
1074     }
1075 }
1076
1077 /* Returns nonzero if this case (numbered CASE_NUM) should be
1078    exclude as specified on FILTER or PROCESS IF, otherwise
1079    zero. */
1080 static int
1081 exclude_this_case (int case_num)
1082 {
1083   /* FILTER. */
1084   struct variable *filter_var = dict_get_filter (default_dict);
1085   if (filter_var != NULL) 
1086     {
1087       double f = temp_case->data[filter_var->fv].f;
1088       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
1089         return 1;
1090     }
1091
1092   /* PROCESS IF. */
1093   if (process_if_expr != NULL
1094       && expr_evaluate (process_if_expr, temp_case, case_num, NULL) != 1.0)
1095     return 1;
1096
1097   return 0;
1098 }
1099
1100 /* Appends TRNS to t_trns[], the list of all transformations to be
1101    performed on data as it is read from the active file. */
1102 void
1103 add_transformation (struct trns_header * trns)
1104 {
1105   if (n_trns >= m_trns)
1106     {
1107       m_trns += 16;
1108       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1109     }
1110   t_trns[n_trns] = trns;
1111   trns->index = n_trns++;
1112 }
1113
1114 /* Cancels all active transformations, including any transformations
1115    created by the input program. */
1116 void
1117 cancel_transformations (void)
1118 {
1119   int i;
1120   for (i = 0; i < n_trns; i++)
1121     {
1122       if (t_trns[i]->free)
1123         t_trns[i]->free (t_trns[i]);
1124       free (t_trns[i]);
1125     }
1126   n_trns = f_trns = 0;
1127   if (m_trns > 32)
1128     {
1129       free (t_trns);
1130       m_trns = 0;
1131     }
1132 }
1133
1134 /* Dumps out the values of all the split variables for the case C. */
1135 static void
1136 dump_splits (struct ccase *c)
1137 {
1138   struct variable *const *split;
1139   struct tab_table *t;
1140   size_t split_cnt;
1141   int i;
1142
1143   split_cnt = dict_get_split_cnt (default_dict);
1144   t = tab_create (3, split_cnt + 1, 0);
1145   tab_dim (t, tab_natural_dimensions);
1146   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1147   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1148   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1149   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1150   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1151   split = dict_get_split_vars (default_dict);
1152   for (i = 0; i < split_cnt; i++)
1153     {
1154       struct variable *v = split[i];
1155       char temp_buf[80];
1156       const char *val_lab;
1157
1158       assert (v->type == NUMERIC || v->type == ALPHA);
1159       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1160       
1161       data_out (temp_buf, &v->print, &c->data[v->fv]);
1162       
1163       temp_buf[v->print.w] = 0;
1164       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1165
1166       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1167       if (val_lab)
1168         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1169     }
1170   tab_flags (t, SOMF_NO_TITLE);
1171   tab_submit (t);
1172 }
1173
1174 /* This proc_func is substituted for the user-supplied proc_func when
1175    SPLIT FILE is active.  This function forms a wrapper around that
1176    proc_func by dividing the input into series. */
1177 static int
1178 SPLIT_FILE_proc_func (struct ccase *c, void *data_)
1179 {
1180   struct write_case_data *data = data_;
1181   struct split_aux_data *split_aux = data->aux;
1182   struct variable *const *split;
1183   size_t split_cnt;
1184   size_t i;
1185
1186   /* The first case always begins a new series.  We also need to
1187      preserve the values of the case for later comparison. */
1188   if (case_count == 0)
1189     {
1190       memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
1191
1192       dump_splits (c);
1193       if (data->begin_func != NULL)
1194         data->begin_func (data->func_aux);
1195       
1196       return data->proc_func (c, data->func_aux);
1197     }
1198
1199   /* Compare the value of each SPLIT FILE variable to the values on
1200      the previous case. */
1201   split = dict_get_split_vars (default_dict);
1202   split_cnt = dict_get_split_cnt (default_dict);
1203   for (i = 0; i < split_cnt; i++)
1204     {
1205       struct variable *v = split[i];
1206       
1207       switch (v->type)
1208         {
1209         case NUMERIC:
1210           if (c->data[v->fv].f != split_aux->prev_case->data[v->fv].f)
1211             goto not_equal;
1212           break;
1213         case ALPHA:
1214           if (memcmp (c->data[v->fv].s,
1215                       split_aux->prev_case->data[v->fv].s, v->width))
1216             goto not_equal;
1217           break;
1218         default:
1219           assert (0);
1220         }
1221     }
1222   return data->proc_func (c, data->func_aux);
1223   
1224 not_equal:
1225   /* The values of the SPLIT FILE variable are different from the
1226      values on the previous case.  That means that it's time to begin
1227      a new series. */
1228   if (data->end_func != NULL)
1229     data->end_func (data->func_aux);
1230   dump_splits (c);
1231   if (data->begin_func != NULL)
1232     data->begin_func (data->func_aux);
1233   memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
1234   return data->proc_func (c, data->func_aux);
1235 }
1236 \f
1237 /* Case compaction. */
1238
1239 /* Copies case SRC to case DEST, compacting it in the process. */
1240 void
1241 compact_case (struct ccase *dest, const struct ccase *src)
1242 {
1243   int i;
1244   int nval = 0;
1245   size_t var_cnt;
1246   
1247   assert (compaction_necessary);
1248
1249   if (temporary == 2)
1250     {
1251       if (dest != compaction_case)
1252         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1253       return;
1254     }
1255
1256   /* Copy all the variables except the scratch variables from SRC to
1257      DEST. */
1258   var_cnt = dict_get_var_cnt (default_dict);
1259   for (i = 0; i < var_cnt; i++)
1260     {
1261       struct variable *v = dict_get_var (default_dict, i);
1262       
1263       if (v->name[0] == '#')
1264         continue;
1265
1266       if (v->type == NUMERIC)
1267         dest->data[nval++] = src->data[v->fv];
1268       else
1269         {
1270           int w = DIV_RND_UP (v->width, sizeof (union value));
1271           
1272           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1273           nval += w;
1274         }
1275     }
1276 }
1277
1278 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1279 static void
1280 finish_compaction (void)
1281 {
1282   int i;
1283
1284   for (i = 0; i < dict_get_var_cnt (default_dict); )
1285     {
1286       struct variable *v = dict_get_var (default_dict, i);
1287
1288       if (v->name[0] == '#') 
1289         dict_delete_var (default_dict, v);
1290       else
1291         i++;
1292     }
1293   dict_compact_values (default_dict);
1294 }
1295
1296 struct case_source *
1297 create_case_source (const struct case_source_class *class, void *aux) 
1298 {
1299   struct case_source *source = xmalloc (sizeof *source);
1300   source->class = class;
1301   source->aux = aux;
1302   return source;
1303 }
1304
1305 int
1306 case_source_is_complex (const struct case_source *source) 
1307 {
1308   return source != NULL && (source->class == &input_program_source_class
1309                             || source->class == &file_type_source_class);
1310 }
1311
1312 int
1313 case_source_is_class (const struct case_source *source,
1314                       const struct case_source_class *class) 
1315 {
1316   return source != NULL && source->class == class;
1317 }
1318
1319 struct case_sink *
1320 create_case_sink (const struct case_sink_class *class, void *aux) 
1321 {
1322   struct case_sink *sink = xmalloc (sizeof *sink);
1323   sink->class = class;
1324   sink->aux = aux;
1325   return sink;
1326 }
1327