51bd02cc5c14e8707789670ec81e4324ed947697
[pspp-builds.git] / src / vfm.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "vfm.h"
22 #include "vfmP.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #if HAVE_UNISTD_H
28 #include <unistd.h>     /* Required by SunOS4. */
29 #endif
30 #include "alloc.h"
31 #include "do-ifP.h"
32 #include "error.h"
33 #include "expr.h"
34 #include "misc.h"
35 #include "random.h"
36 #include "settings.h"
37 #include "som.h"
38 #include "str.h"
39 #include "tab.h"
40 #include "var.h"
41 #include "value-labels.h"
42
43 /*
44    Virtual File Manager (vfm):
45
46    vfm is used to process data files.  It uses the model that
47    data is read from one stream (the data source), processed,
48    then written to another (the data sink).  The data source is
49    then deleted and the data sink becomes the data source for the
50    next procedure. */
51
52 /* Procedure execution data. */
53 struct write_case_data
54   {
55     /* Functions to call... */
56     void (*begin_func) (void *);               /* ...before data. */
57     int (*proc_func) (struct ccase *, void *); /* ...with data. */
58     void (*end_func) (void *);                 /* ...after data. */
59     void *func_aux;                            /* Auxiliary data. */ 
60
61     /* Extra auxiliary data. */
62     void *aux;
63   };
64
65 /* The current active file, from which cases are read. */
66 struct case_source *vfm_source;
67
68 /* The replacement active file, to which cases are written. */
69 struct case_sink *vfm_sink;
70
71 /* Nonzero if the case needs to have values deleted before being
72    stored, zero otherwise. */
73 int compaction_necessary;
74
75 /* Number of values after compaction. */
76 int compaction_nval;
77
78 /* Temporary case buffer with enough room for `compaction_nval'
79    `value's. */
80 struct ccase *compaction_case;
81
82 /* Nonzero means that we've overflowed our allotted workspace.
83    After that happens once during a session, we always store the
84    active file on disk instead of in memory.  (This policy may be
85    too aggressive.) */
86 static int workspace_overflow = 0;
87
88 /* Time at which vfm was last invoked. */
89 time_t last_vfm_invocation;
90
91 /* Number of cases passed to proc_func(). */
92 static int case_count;
93
94 /* Lag queue. */
95 int n_lag;                      /* Number of cases to lag. */
96 static int lag_count;           /* Number of cases in lag_queue so far. */
97 static int lag_head;            /* Index where next case will be added. */
98 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
99
100 static struct ccase *create_trns_case (struct dictionary *dict);
101 static void open_active_file (void);
102 static void close_active_file (struct write_case_data *);
103 static int SPLIT_FILE_proc_func (struct ccase *, void *);
104 static void finish_compaction (void);
105 static void lag_case (const struct ccase *);
106 static write_case_func procedure_write_case;
107 static void clear_case (struct ccase *);
108 static int exclude_this_case (const struct ccase *, int case_num);
109 \f
110 /* Public functions. */
111
112 /* Auxiliary data for executing a procedure. */
113 struct procedure_aux_data 
114   {
115     struct ccase *trns_case;    /* Case used for transformations. */
116     size_t cases_written;       /* Number of cases written so far. */
117   };
118
119 /* Auxiliary data for SPLIT FILE. */
120 struct split_aux_data 
121   {
122     struct ccase *prev_case;    /* Data in previous case. */
123   };
124
125 /* Reads all the cases from the active file, transforms them by
126    the active set of transformations, passes each of them to
127    PROC_FUNC, and writes them to a new active file.
128
129    Divides the active file into zero or more series of one or more
130    cases each.  BEGIN_FUNC is called before each series.  END_FUNC is
131    called after each series.
132
133    Arbitrary user-specified data AUX is passed to BEGIN_FUNC,
134    PROC_FUNC, and END_FUNC as auxiliary data. */
135 void
136 procedure (void (*begin_func) (void *),
137            int (*proc_func) (struct ccase *, void *),
138            void (*end_func) (void *),
139            void *func_aux)
140 {
141   static int recursive_call;
142
143   struct write_case_data procedure_write_data;
144   struct procedure_aux_data proc_aux;
145
146   struct write_case_data split_file_data;
147   struct split_aux_data split_aux;
148   int split;
149
150   assert (++recursive_call == 1);
151
152   proc_aux.cases_written = 0;
153   proc_aux.trns_case = create_trns_case (default_dict);
154
155   /* Normally we just use the data passed by the user. */
156   procedure_write_data.begin_func = begin_func;
157   procedure_write_data.proc_func = proc_func;
158   procedure_write_data.end_func = end_func;
159   procedure_write_data.func_aux = func_aux;
160   procedure_write_data.aux = &proc_aux;
161
162   /* Under SPLIT FILE, we add a layer of indirection. */
163   split = dict_get_split_cnt (default_dict) > 0;
164   if (split) 
165     {
166       split_file_data = procedure_write_data;
167       split_file_data.aux = &split_aux;
168
169       split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
170
171       procedure_write_data.begin_func = NULL;
172       procedure_write_data.proc_func = SPLIT_FILE_proc_func;
173       procedure_write_data.end_func = end_func;
174       procedure_write_data.func_aux = &split_file_data;
175     }
176
177   last_vfm_invocation = time (NULL);
178
179   open_active_file ();
180   if (vfm_source != NULL) 
181     vfm_source->class->read (vfm_source,
182                              proc_aux.trns_case,
183                              procedure_write_case, &procedure_write_data);
184   close_active_file (&procedure_write_data);
185
186   if (split)
187     free (split_aux.prev_case);
188
189   free (proc_aux.trns_case);
190
191   assert (--recursive_call == 0);
192 }
193 \f
194 /* Active file processing support.  Subtly different semantics from
195    procedure(). */
196
197 static write_case_func process_active_file_write_case;
198
199 /* The case_func might want us to stop calling it. */
200 static int not_canceled;
201
202 /* Reads all the cases from the active file and passes them
203    one-by-one to CASE_FUNC.  Before any cases are passed, calls
204    BEGIN_FUNC.  After all the cases have been passed, calls
205    END_FUNC.  BEGIN_FUNC, CASE_FUNC, and END_FUNC can write to
206    the output file by calling process_active_file_output_case().
207
208    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
209 void
210 process_active_file (void (*begin_func) (void *),
211                      int (*case_func) (struct ccase *, void *),
212                      void (*end_func) (void *),
213                      void *func_aux)
214 {
215   struct procedure_aux_data proc_aux;
216   struct write_case_data process_active_write_data;
217
218   proc_aux.cases_written = 0;
219   proc_aux.trns_case = create_trns_case (default_dict);
220
221   process_active_write_data.begin_func = begin_func;
222   process_active_write_data.proc_func = case_func;
223   process_active_write_data.end_func = end_func;
224   process_active_write_data.func_aux = func_aux;
225   process_active_write_data.aux = &proc_aux;
226
227   not_canceled = 1;
228
229   open_active_file ();
230   begin_func (func_aux);
231   if (vfm_source != NULL)
232     vfm_source->class->read (vfm_source, proc_aux.trns_case,
233                              process_active_file_write_case,
234                              &process_active_write_data);
235   end_func (func_aux);
236   close_active_file (&process_active_write_data);
237 }
238
239 /* Pass the current case to case_func. */
240 static int
241 process_active_file_write_case (struct write_case_data *wc_data)
242 {
243   struct procedure_aux_data *proc_aux = wc_data->aux;
244   int cur_trns;         /* Index of current transformation. */
245
246   for (cur_trns = f_trns; cur_trns != temp_trns; )
247     {
248       int code;
249         
250       code = t_trns[cur_trns]->proc (t_trns[cur_trns], proc_aux->trns_case,
251                                      case_count + 1);
252       switch (code)
253         {
254         case -1:
255           /* Next transformation. */
256           cur_trns++;
257           break;
258         case -2:
259           /* Delete this case. */
260           goto done;
261         default:
262           /* Go to that transformation. */
263           cur_trns = code;
264           break;
265         }
266     }
267
268   if (n_lag)
269     lag_case (proc_aux->trns_case);
270           
271   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
272   if (not_canceled && !exclude_this_case (proc_aux->trns_case, case_count + 1))
273     not_canceled = wc_data->proc_func (proc_aux->trns_case, wc_data->func_aux);
274   
275   case_count++;
276   
277  done:
278   clear_case (proc_aux->trns_case);
279
280   return 1;
281 }
282
283 /* Write the given case to the active file. */
284 void
285 process_active_file_output_case (const struct ccase *c)
286 {
287   vfm_sink->class->write (vfm_sink, c);
288 }
289 \f
290 /* Creates and returns a case, initializing it from the vectors
291    that say which `value's need to be initialized just once, and
292    which ones need to be re-initialized before every case. */
293 static struct ccase *
294 create_trns_case (struct dictionary *dict)
295 {
296   struct ccase *c = xmalloc (dict_get_case_size (dict));
297   size_t var_cnt = dict_get_var_cnt (dict);
298   size_t i;
299
300   for (i = 0; i < var_cnt; i++) 
301     {
302       struct variable *v = dict_get_var (dict, i);
303
304       if (v->type == NUMERIC) 
305         {
306           if (v->reinit)
307             c->data[v->fv].f = 0.0;
308           else
309             c->data[v->fv].f = SYSMIS;
310         }
311       else
312         memset (c->data[v->fv].s, ' ', v->width);
313     }
314   return c;
315 }
316 \f
317 /* Opening the active file. */
318
319 /* It might be usefully noted that the following several functions are
320    given in the order that they are called by open_active_file(). */
321
322 /* Prepare to write to the replacement active file. */
323 static void
324 prepare_for_writing (void)
325 {
326   if (vfm_sink == NULL)
327     {
328       if (workspace_overflow)
329         vfm_sink = create_case_sink (&disk_sink_class, NULL);
330       else
331         vfm_sink = create_case_sink (&memory_sink_class, NULL);
332     }
333 }
334
335 /* Arrange for compacting the output cases for storage. */
336 static void
337 arrange_compaction (void)
338 {
339   int count_values = 0;
340
341   {
342     int i;
343     
344     /* Count up the number of `value's that will be output. */
345     for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
346       {
347         struct variable *v = dict_get_var (temp_dict, i);
348
349         if (dict_class_from_id (v->name) != DC_SCRATCH)
350           {
351             assert (v->nv > 0);
352             count_values += v->nv;
353           } 
354       }
355     assert (temporary == 2
356             || count_values <= dict_get_next_value_idx (temp_dict));
357   }
358   
359   /* Compaction is only necessary if the number of `value's to output
360      differs from the number already present. */
361   compaction_nval = count_values;
362   if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
363     compaction_necessary = 1;
364   else
365     compaction_necessary = 0;
366   
367   if (vfm_sink->class->open != NULL)
368     vfm_sink->class->open (vfm_sink);
369
370   if (compaction_necessary)
371     compaction_case = xmalloc (sizeof (struct ccase)
372                                + sizeof (union value) * (compaction_nval - 1));
373
374 }
375
376 #if DEBUGGING
377 /* Returns the name of the variable that owns the index CCASE_INDEX
378    into ccase. */
379 static const char *
380 index_to_varname (int ccase_index)
381 {
382   int i;
383
384   for (i = 0; i < default_dict.nvar; i++)
385     {
386       struct variable *v = default_dict.var[i];
387       
388       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
389         return default_dict.var[i]->name;
390     }
391   return _("<NOVAR>");
392 }
393 #endif
394
395 /* Sets all the lag-related variables based on value of n_lag. */
396 static void
397 setup_lag (void)
398 {
399   int i;
400   
401   if (n_lag == 0)
402     return;
403
404   lag_count = 0;
405   lag_head = 0;
406   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
407   for (i = 0; i < n_lag; i++)
408     lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
409 }
410
411 /* There is a lot of potential confusion in the vfm and related
412    routines over the number of `value's at each stage of the process.
413    Here is each nval count, with explanation, as set up by
414    open_active_file():
415
416    temp_dict->nval: Number of `value's in the cases after the
417    transformations leading up to TEMPORARY have been performed.
418
419    compaction_nval: Number of `value's in the cases after the
420    transformations leading up to TEMPORARY have been performed
421    and the case has been compacted by compact_case(), if
422    compaction is necessary.  This the number of `value's in the
423    cases saved by the sink stream.  (However, note that the cases
424    passed to the sink stream have not yet been compacted.  It is
425    the responsibility of the data sink to call compact_case().)
426    `compaction' becomes the new value of default_dict.nval after
427    the procedure is completed.
428
429    default_dict.nval: This is often an alias for temp_dict->nval.
430    As such it can really have no separate existence until the
431    procedure is complete.  For this reason it should *not* be
432    referenced inside the execution of a procedure. */
433 /* Makes all preparations for reading from the data source and writing
434    to the data sink. */
435 static void
436 open_active_file (void)
437 {
438   /* Sometimes we want to refer to the dictionary that applies to the
439      data actually written to the sink.  This is either temp_dict or
440      default_dict.  However, if TEMPORARY is not on, then temp_dict
441      does not apply.  So, we can set temp_dict to default_dict in this
442      case. */
443   if (!temporary)
444     {
445       temp_trns = n_trns;
446       temp_dict = default_dict;
447     }
448
449   /* No cases passed to the procedure yet. */
450   case_count = 0;
451
452   /* The rest. */
453   prepare_for_writing ();
454   arrange_compaction ();
455   discard_ctl_stack ();
456   setup_lag ();
457 }
458 \f
459 /* Closes the active file. */
460 static void
461 close_active_file (struct write_case_data *data)
462 {
463   /* Close the current case group. */
464   if (case_count && data->end_func != NULL)
465     data->end_func (data->func_aux);
466
467   /* Stop lagging (catch up?). */
468   if (n_lag)
469     {
470       int i;
471       
472       for (i = 0; i < n_lag; i++)
473         free (lag_queue[i]);
474       free (lag_queue);
475       n_lag = 0;
476     }
477   
478   /* Assume the dictionary from right before TEMPORARY, if any.  Turn
479      off TEMPORARY. */
480   if (temporary)
481     {
482       dict_destroy (default_dict);
483       default_dict = temp_dict;
484       temp_dict = NULL;
485     }
486
487   /* Finish compaction. */
488   if (compaction_necessary)
489     finish_compaction ();
490     
491   /* Old data sink --> New data source. */
492   if (vfm_source != NULL) 
493     {
494       if (vfm_source->class->destroy != NULL)
495         vfm_source->class->destroy (vfm_source);
496       free (vfm_source);
497     }
498
499   if (vfm_sink->class->make_source != NULL)
500     vfm_source = vfm_sink->class->make_source (vfm_sink);
501   else
502     vfm_source = NULL;
503
504   /* Old data sink is gone now. */
505   free (vfm_sink);
506   vfm_sink = NULL;
507
508   /* Cancel TEMPORARY. */
509   cancel_temporary ();
510
511   /* Free temporary cases. */
512   free (compaction_case);
513   compaction_case = NULL;
514
515   /* Cancel PROCESS IF. */
516   expr_free (process_if_expr);
517   process_if_expr = NULL;
518
519   /* Cancel FILTER if temporary. */
520   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
521     dict_set_filter (default_dict, NULL);
522
523   /* Cancel transformations. */
524   cancel_transformations ();
525
526   /* Turn off case limiter. */
527   dict_set_case_limit (default_dict, 0);
528
529   /* Clear VECTOR vectors. */
530   dict_clear_vectors (default_dict);
531 }
532 \f
533 /* Disk case stream. */
534
535 /* Information about disk sink or source. */
536 struct disk_stream_info 
537   {
538     FILE *file;                 /* Output file. */
539     size_t case_cnt;            /* Number of cases written so far. */
540     size_t case_size;           /* Number of bytes in case. */
541   };
542
543 /* Initializes the disk sink. */
544 static void
545 disk_sink_create (struct case_sink *sink)
546 {
547   struct disk_stream_info *info = xmalloc (sizeof *info);
548   info->file = tmpfile ();
549   info->case_cnt = 0;
550   info->case_size = compaction_nval;
551   sink->aux = info;
552   if (info->file == NULL)
553     {
554       msg (ME, _("An error occurred attempting to create a temporary "
555                  "file for use as the active file: %s."),
556            strerror (errno));
557       err_failure ();
558     }
559 }
560
561 /* Writes case C to the disk sink. */
562 static void
563 disk_sink_write (struct case_sink *sink, const struct ccase *c)
564 {
565   struct disk_stream_info *info = sink->aux;
566   const union value *src_case;
567
568   if (compaction_necessary)
569     {
570       compact_case (compaction_case, c);
571       src_case = compaction_case->data;
572     }
573   else src_case = c->data;
574
575   info->case_cnt++;
576   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
577               info->file) != 1)
578     {
579       msg (ME, _("An error occurred while attempting to write to a "
580                  "temporary file used as the active file: %s."),
581            strerror (errno));
582       err_failure ();
583     }
584 }
585
586 /* Destroys the sink's internal data. */
587 static void
588 disk_sink_destroy (struct case_sink *sink)
589 {
590   struct disk_stream_info *info = sink->aux;
591   if (info->file != NULL)
592     fclose (info->file);
593 }
594
595 /* Closes and destroys the sink and returns a disk source to read
596    back the written data. */
597 static struct case_source *
598 disk_sink_make_source (struct case_sink *sink) 
599 {
600   struct disk_stream_info *info = sink->aux;
601     
602   /* Rewind the file. */
603   assert (info->file != NULL);
604   if (fseek (info->file, 0, SEEK_SET) != 0)
605     {
606       msg (ME, _("An error occurred while attempting to rewind a "
607                  "temporary file used as the active file: %s."),
608            strerror (errno));
609       err_failure ();
610     }
611   
612   return create_case_source (&disk_source_class, default_dict, info);
613 }
614
615 /* Disk sink. */
616 const struct case_sink_class disk_sink_class = 
617   {
618     "disk",
619     disk_sink_create,
620     disk_sink_write,
621     disk_sink_destroy,
622     disk_sink_make_source,
623   };
624 \f
625 /* Disk source. */
626
627 /* Returns the number of cases that will be read by
628    disk_source_read(). */
629 static int
630 disk_source_count (const struct case_source *source) 
631 {
632   struct disk_stream_info *info = source->aux;
633
634   return info->case_cnt;
635 }
636
637 /* Reads all cases from the disk source and passes them one by one to
638    write_case(). */
639 static void
640 disk_source_read (struct case_source *source,
641                   struct ccase *c,
642                   write_case_func *write_case, write_case_data wc_data)
643 {
644   struct disk_stream_info *info = source->aux;
645   int i;
646
647   for (i = 0; i < info->case_cnt; i++)
648     {
649       if (!fread (c, info->case_size, 1, info->file))
650         {
651           msg (ME, _("An error occurred while attempting to read from "
652                "a temporary file created for the active file: %s."),
653                strerror (errno));
654           err_failure ();
655           break;
656         }
657
658       if (!write_case (wc_data))
659         break;
660     }
661 }
662
663 /* Destroys the source's internal data. */
664 static void
665 disk_source_destroy (struct case_source *source)
666 {
667   struct disk_stream_info *info = source->aux;
668   if (info->file != NULL)
669     fclose (info->file);
670   free (info);
671 }
672
673 /* Disk source. */
674 const struct case_source_class disk_source_class = 
675   {
676     "disk",
677     disk_source_count,
678     disk_source_read,
679     disk_source_destroy,
680   };
681 \f
682 /* Memory case stream. */
683
684 /* Memory sink data. */
685 struct memory_sink_info
686   {
687     size_t case_cnt;            /* Number of cases. */
688     size_t case_size;           /* Case size in bytes. */
689     int max_cases;              /* Maximum cases before switching to disk. */
690     struct case_list *head;     /* First case in list. */
691     struct case_list *tail;     /* Last case in list. */
692   };
693
694 /* Memory source data. */
695 struct memory_source_info 
696   {
697     size_t case_cnt;            /* Number of cases. */
698     size_t case_size;           /* Case size in bytes. */
699     struct case_list *cases;    /* List of cases. */
700   };
701
702 /* Creates the SINK memory sink. */
703 static void
704 memory_sink_create (struct case_sink *sink) 
705 {
706   struct memory_sink_info *info;
707   
708   sink->aux = info = xmalloc (sizeof *info);
709
710   assert (compaction_nval > 0);
711   info->case_cnt = 0;
712   info->case_size = compaction_nval * sizeof (union value);
713   info->max_cases = set_max_workspace / info->case_size;
714   info->head = info->tail = NULL;
715 }
716
717 /* Writes case C to memory sink SINK. */
718 static void
719 memory_sink_write (struct case_sink *sink, const struct ccase *c) 
720 {
721   struct memory_sink_info *info = sink->aux;
722   size_t case_size;
723   struct case_list *new_case;
724
725   case_size = sizeof (struct case_list)
726                       + ((compaction_nval - 1) * sizeof (union value));
727   new_case = malloc (case_size);
728
729   /* If we've got memory to spare then add it to the linked list. */
730   if (info->case_cnt <= info->max_cases && new_case != NULL)
731     {
732       info->case_cnt++;
733
734       /* Append case to linked list. */
735       new_case->next = NULL;
736       if (info->head != NULL)
737         info->tail->next = new_case;
738       else
739         info->head = new_case;
740       info->tail = new_case;
741
742       /* Copy data into case. */
743       if (compaction_necessary)
744         compact_case (&new_case->c, c);
745       else
746         memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
747     }
748   else
749     {
750       /* Out of memory.  Write the active file to disk. */
751       struct case_list *cur, *next;
752
753       /* Notify the user. */
754       if (!new_case)
755         msg (MW, _("Virtual memory exhausted.  Writing active file "
756                    "to disk."));
757       else
758         msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
759                    "overflowed.  Writing active file to disk."),
760              set_max_workspace / 1024, info->max_cases,
761              compaction_nval * sizeof (union value));
762
763       free (new_case);
764
765       /* Switch to a disk sink. */
766       vfm_sink = create_case_sink (&disk_sink_class, NULL);
767       vfm_sink->class->open (vfm_sink);
768       workspace_overflow = 1;
769
770       /* Write the cases to disk and destroy them.  We can't call
771          vfm->sink->write() because of compaction. */
772       for (cur = info->head; cur; cur = next)
773         {
774           next = cur->next;
775           if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
776                       vfm_sink->aux) != 1)
777             {
778               msg (ME, _("An error occurred while attempting to "
779                          "write to a temporary file created as the "
780                          "active file: %s."),
781                    strerror (errno));
782               err_failure ();
783             }
784           free (cur);
785         }
786
787       /* Write the current case to disk. */
788       vfm_sink->class->write (vfm_sink, c);
789     }
790 }
791
792 /* If the data is stored in memory, causes it to be written to disk.
793    To be called only *between* procedure()s, not within them. */
794 void
795 write_active_file_to_disk (void)
796 {
797   if (case_source_is_class (vfm_source, &memory_source_class))
798     {
799       struct memory_source_info *info = vfm_source->aux;
800
801       /* Switch to a disk sink. */
802       vfm_sink = create_case_sink (&disk_sink_class, NULL);
803       vfm_sink->class->open (vfm_sink);
804       workspace_overflow = 1;
805       
806       /* Write the cases to disk and destroy them.  We can't call
807          vfm->sink->write() because of compaction. */
808       {
809         struct case_list *cur, *next;
810         
811         for (cur = info->cases; cur; cur = next)
812           {
813             next = cur->next;
814             if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
815                         vfm_sink->aux) != 1)
816               {
817                 msg (ME, _("An error occurred while attempting to "
818                            "write to a temporary file created as the "
819                            "active file: %s."),
820                      strerror (errno));
821                 err_failure ();
822               }
823             free (cur);
824           }
825       }
826       
827       vfm_source = vfm_sink->class->make_source (vfm_sink);
828       vfm_sink = NULL;
829     }
830 }
831
832 /* Destroy all memory sink data. */
833 static void
834 memory_sink_destroy (struct case_sink *sink)
835 {
836   struct memory_sink_info *info = sink->aux;
837   struct case_list *cur, *next;
838   
839   for (cur = info->head; cur; cur = next)
840     {
841       next = cur->next;
842       free (cur);
843     }
844   free (info);
845 }
846
847 /* Switch the memory stream from sink to source mode. */
848 static struct case_source *
849 memory_sink_make_source (struct case_sink *sink)
850 {
851   struct memory_sink_info *sink_info = sink->aux;
852   struct memory_source_info *source_info;
853
854   source_info = xmalloc (sizeof *source_info);
855   source_info->case_cnt = sink_info->case_cnt;
856   source_info->case_size = sink_info->case_size;
857   source_info->cases = sink_info->head;
858
859   free (sink_info);
860
861   return create_case_source (&memory_source_class,
862                              default_dict, source_info);
863 }
864
865 const struct case_sink_class memory_sink_class = 
866   {
867     "memory",
868     memory_sink_create,
869     memory_sink_write,
870     memory_sink_destroy,
871     memory_sink_make_source,
872   };
873
874 /* Returns the number of cases in the source. */
875 static int
876 memory_source_count (const struct case_source *source) 
877 {
878   struct memory_source_info *info = source->aux;
879
880   return info->case_cnt;
881 }
882
883 /* Reads the case stream from memory and passes it to write_case(). */
884 static void
885 memory_source_read (struct case_source *source,
886                     struct ccase *c,
887                     write_case_func *write_case, write_case_data wc_data)
888 {
889   struct memory_source_info *info = source->aux;
890
891   while (info->cases != NULL) 
892     {
893       struct case_list *iter = info->cases;
894       memcpy (c, &iter->c, info->case_size);
895       if (!write_case (wc_data)) 
896         break;
897             
898       info->cases = iter->next;
899       free (iter);
900     }
901 }
902
903 /* Destroy all memory source data. */
904 static void
905 memory_source_destroy (struct case_source *source)
906 {
907   struct memory_source_info *info = source->aux;
908   struct case_list *cur, *next;
909   
910   for (cur = info->cases; cur; cur = next)
911     {
912       next = cur->next;
913       free (cur);
914     }
915   free (info);
916 }
917
918 /* Returns the list of cases in memory source SOURCE. */
919 struct case_list *
920 memory_source_get_cases (const struct case_source *source) 
921 {
922   struct memory_source_info *info = source->aux;
923
924   return info->cases;
925 }
926
927 /* Sets the list of cases in memory source SOURCE to CASES. */
928 void
929 memory_source_set_cases (const struct case_source *source,
930                          struct case_list *cases) 
931 {
932   struct memory_source_info *info = source->aux;
933
934   info->cases = cases;
935 }
936
937 /* Memory stream. */
938 const struct case_source_class memory_source_class = 
939   {
940     "memory",
941     memory_source_count,
942     memory_source_read,
943     memory_source_destroy,
944   };
945 \f
946 /* Add C to the lag queue. */
947 static void
948 lag_case (const struct ccase *c)
949 {
950   if (lag_count < n_lag)
951     lag_count++;
952   memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
953   if (++lag_head >= n_lag)
954     lag_head = 0;
955 }
956
957 /* Returns a pointer to the lagged case from N_BEFORE cases before the
958    current one, or NULL if there haven't been that many cases yet. */
959 struct ccase *
960 lagged_case (int n_before)
961 {
962   assert (n_before <= n_lag);
963   if (n_before > lag_count)
964     return NULL;
965   
966   {
967     int index = lag_head - n_before;
968     if (index < 0)
969       index += n_lag;
970     return lag_queue[index];
971   }
972 }
973    
974 /* Transforms trns_case and writes it to the replacement active
975    file if advisable.  Returns nonzero if more cases can be
976    accepted, zero otherwise.  Do not call this function again
977    after it has returned zero once.  */
978 int
979 procedure_write_case (write_case_data wc_data)
980 {
981   struct procedure_aux_data *proc_aux = wc_data->aux;
982
983   /* Index of current transformation. */
984   int cur_trns;
985
986   /* Return value: whether it's reasonable to write any more cases. */
987   int more_cases = 1;
988
989   cur_trns = f_trns;
990   for (;;)
991     {
992       /* Output the case if this is temp_trns. */
993       if (cur_trns == temp_trns)
994         {
995           int case_limit;
996
997           if (n_lag)
998             lag_case (proc_aux->trns_case);
999           
1000           vfm_sink->class->write (vfm_sink, proc_aux->trns_case);
1001
1002           proc_aux->cases_written++;
1003           case_limit = dict_get_case_limit (default_dict);
1004           if (case_limit != 0 && proc_aux->cases_written >= case_limit)
1005             more_cases = 0;
1006         }
1007
1008       /* Are we done? */
1009       if (cur_trns >= n_trns)
1010         break;
1011       
1012       /* Decide which transformation should come next. */
1013       {
1014         int code;
1015         
1016         code = t_trns[cur_trns]->proc (t_trns[cur_trns], proc_aux->trns_case,
1017                                        proc_aux->cases_written + 1);
1018         switch (code)
1019           {
1020           case -1:
1021             /* Next transformation. */
1022             cur_trns++;
1023             break;
1024           case -2:
1025             /* Delete this case. */
1026             goto done;
1027           default:
1028             /* Go to that transformation. */
1029             cur_trns = code;
1030             break;
1031           }
1032       }
1033     }
1034
1035   /* Call the beginning of group function. */
1036   if (!case_count && wc_data->begin_func != NULL)
1037     wc_data->begin_func (wc_data->func_aux);
1038
1039   /* Call the procedure if there is one and FILTER and PROCESS IF
1040      don't prohibit it. */
1041   if (wc_data->proc_func != NULL
1042       && !exclude_this_case (proc_aux->trns_case, proc_aux->cases_written + 1))
1043     wc_data->proc_func (proc_aux->trns_case, wc_data->func_aux);
1044
1045   case_count++;
1046   
1047 done:
1048   clear_case (proc_aux->trns_case);
1049   
1050   /* Return previously determined value. */
1051   return more_cases;
1052 }
1053
1054 /* Clears the variables in C that need to be cleared between
1055    processing cases.  */
1056 static void
1057 clear_case (struct ccase *c)
1058 {
1059   /* FIXME?  This is linear in the number of variables, but
1060      doesn't need to be, so it's an easy optimization target. */
1061   size_t var_cnt = dict_get_var_cnt (default_dict);
1062   size_t i;
1063   
1064   for (i = 0; i < var_cnt; i++) 
1065     {
1066       struct variable *v = dict_get_var (default_dict, i);
1067       if (v->init && v->reinit) 
1068         {
1069           if (v->type == NUMERIC) 
1070             c->data[v->fv].f = SYSMIS;
1071           else
1072             memset (c->data[v->fv].s, ' ', v->width);
1073         } 
1074     }
1075 }
1076
1077 /* Returns nonzero if case C with case number CASE_NUM should be
1078    exclude as specified on FILTER or PROCESS IF, otherwise
1079    zero. */
1080 static int
1081 exclude_this_case (const struct ccase *c, int case_num)
1082 {
1083   /* FILTER. */
1084   struct variable *filter_var = dict_get_filter (default_dict);
1085   if (filter_var != NULL) 
1086     {
1087       double f = c->data[filter_var->fv].f;
1088       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
1089         return 1;
1090     }
1091
1092   /* PROCESS IF. */
1093   if (process_if_expr != NULL
1094       && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
1095     return 1;
1096
1097   return 0;
1098 }
1099
1100 /* Appends TRNS to t_trns[], the list of all transformations to be
1101    performed on data as it is read from the active file. */
1102 void
1103 add_transformation (struct trns_header * trns)
1104 {
1105   if (n_trns >= m_trns)
1106     {
1107       m_trns += 16;
1108       t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
1109     }
1110   t_trns[n_trns] = trns;
1111   trns->index = n_trns++;
1112 }
1113
1114 /* Cancels all active transformations, including any transformations
1115    created by the input program. */
1116 void
1117 cancel_transformations (void)
1118 {
1119   int i;
1120   for (i = 0; i < n_trns; i++)
1121     {
1122       if (t_trns[i]->free)
1123         t_trns[i]->free (t_trns[i]);
1124       free (t_trns[i]);
1125     }
1126   n_trns = f_trns = 0;
1127   if (m_trns > 32)
1128     {
1129       free (t_trns);
1130       m_trns = 0;
1131     }
1132 }
1133
1134 /* Dumps out the values of all the split variables for the case C. */
1135 static void
1136 dump_splits (struct ccase *c)
1137 {
1138   struct variable *const *split;
1139   struct tab_table *t;
1140   size_t split_cnt;
1141   int i;
1142
1143   split_cnt = dict_get_split_cnt (default_dict);
1144   t = tab_create (3, split_cnt + 1, 0);
1145   tab_dim (t, tab_natural_dimensions);
1146   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1147   tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1148   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1149   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1150   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1151   split = dict_get_split_vars (default_dict);
1152   for (i = 0; i < split_cnt; i++)
1153     {
1154       struct variable *v = split[i];
1155       char temp_buf[80];
1156       const char *val_lab;
1157
1158       assert (v->type == NUMERIC || v->type == ALPHA);
1159       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1160       
1161       data_out (temp_buf, &v->print, &c->data[v->fv]);
1162       
1163       temp_buf[v->print.w] = 0;
1164       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1165
1166       val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1167       if (val_lab)
1168         tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1169     }
1170   tab_flags (t, SOMF_NO_TITLE);
1171   tab_submit (t);
1172 }
1173
1174 /* This proc_func is substituted for the user-supplied proc_func when
1175    SPLIT FILE is active.  This function forms a wrapper around that
1176    proc_func by dividing the input into series. */
1177 static int
1178 SPLIT_FILE_proc_func (struct ccase *c, void *data_)
1179 {
1180   struct write_case_data *data = data_;
1181   struct split_aux_data *split_aux = data->aux;
1182   struct variable *const *split;
1183   size_t split_cnt;
1184   size_t i;
1185
1186   /* The first case always begins a new series.  We also need to
1187      preserve the values of the case for later comparison. */
1188   if (case_count == 0)
1189     {
1190       memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
1191
1192       dump_splits (c);
1193       if (data->begin_func != NULL)
1194         data->begin_func (data->func_aux);
1195       
1196       return data->proc_func (c, data->func_aux);
1197     }
1198
1199   /* Compare the value of each SPLIT FILE variable to the values on
1200      the previous case. */
1201   split = dict_get_split_vars (default_dict);
1202   split_cnt = dict_get_split_cnt (default_dict);
1203   for (i = 0; i < split_cnt; i++)
1204     {
1205       struct variable *v = split[i];
1206       
1207       switch (v->type)
1208         {
1209         case NUMERIC:
1210           if (c->data[v->fv].f != split_aux->prev_case->data[v->fv].f)
1211             goto not_equal;
1212           break;
1213         case ALPHA:
1214           if (memcmp (c->data[v->fv].s,
1215                       split_aux->prev_case->data[v->fv].s, v->width))
1216             goto not_equal;
1217           break;
1218         default:
1219           assert (0);
1220         }
1221     }
1222   return data->proc_func (c, data->func_aux);
1223   
1224 not_equal:
1225   /* The values of the SPLIT FILE variable are different from the
1226      values on the previous case.  That means that it's time to begin
1227      a new series. */
1228   if (data->end_func != NULL)
1229     data->end_func (data->func_aux);
1230   dump_splits (c);
1231   if (data->begin_func != NULL)
1232     data->begin_func (data->func_aux);
1233   memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
1234   return data->proc_func (c, data->func_aux);
1235 }
1236 \f
1237 /* Case compaction. */
1238
1239 /* Copies case SRC to case DEST, compacting it in the process. */
1240 void
1241 compact_case (struct ccase *dest, const struct ccase *src)
1242 {
1243   int i;
1244   int nval = 0;
1245   size_t var_cnt;
1246   
1247   assert (compaction_necessary);
1248
1249   if (temporary == 2)
1250     {
1251       if (dest != compaction_case)
1252         memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
1253       return;
1254     }
1255
1256   /* Copy all the variables except the scratch variables from SRC to
1257      DEST. */
1258   var_cnt = dict_get_var_cnt (default_dict);
1259   for (i = 0; i < var_cnt; i++)
1260     {
1261       struct variable *v = dict_get_var (default_dict, i);
1262       
1263       if (dict_class_from_id (v->name) == DC_SCRATCH)
1264         continue;
1265
1266       if (v->type == NUMERIC)
1267         dest->data[nval++] = src->data[v->fv];
1268       else
1269         {
1270           int w = DIV_RND_UP (v->width, sizeof (union value));
1271           
1272           memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
1273           nval += w;
1274         }
1275     }
1276 }
1277
1278 /* Reassigns `fv' for each variable.  Deletes scratch variables. */
1279 static void
1280 finish_compaction (void)
1281 {
1282   int i;
1283
1284   for (i = 0; i < dict_get_var_cnt (default_dict); )
1285     {
1286       struct variable *v = dict_get_var (default_dict, i);
1287
1288       if (dict_class_from_id (v->name) == DC_SCRATCH) 
1289         dict_delete_var (default_dict, v);
1290       else
1291         i++;
1292     }
1293   dict_compact_values (default_dict);
1294 }
1295
1296 /* Creates a case source with class CLASS and auxiliary data AUX
1297    and based on dictionary DICT. */
1298 struct case_source *
1299 create_case_source (const struct case_source_class *class,
1300                     const struct dictionary *dict,
1301                     void *aux) 
1302 {
1303   struct case_source *source = xmalloc (sizeof *source);
1304   source->class = class;
1305   source->value_cnt = dict_get_next_value_idx (dict);
1306   source->aux = aux;
1307   return source;
1308 }
1309
1310 /* Returns nonzero if a case source is "complex". */
1311 int
1312 case_source_is_complex (const struct case_source *source) 
1313 {
1314   return source != NULL && (source->class == &input_program_source_class
1315                             || source->class == &file_type_source_class);
1316 }
1317
1318 /* Returns nonzero if CLASS is the class of SOURCE. */
1319 int
1320 case_source_is_class (const struct case_source *source,
1321                       const struct case_source_class *class) 
1322 {
1323   return source != NULL && source->class == class;
1324 }
1325
1326 /* Creates a case sink with class CLASS and auxiliary data
1327    AUX. */
1328 struct case_sink *
1329 create_case_sink (const struct case_sink_class *class, void *aux) 
1330 {
1331   struct case_sink *sink = xmalloc (sizeof *sink);
1332   sink->class = class;
1333   sink->aux = aux;
1334   return sink;
1335 }