1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
28 #include <unistd.h> /* Required by SunOS4. */
41 #include "value-labels.h"
44 Virtual File Manager (vfm):
46 vfm is used to process data files. It uses the model that
47 data is read from one stream (the data source), processed,
48 then written to another (the data sink). The data source is
49 then deleted and the data sink becomes the data source for the
52 /* Procedure execution data. */
53 struct write_case_data
55 /* Function to call for each case. */
56 int (*proc_func) (struct ccase *, void *); /* Function. */
57 void *aux; /* Auxiliary data. */
59 struct ccase *trns_case; /* Case used for transformations. */
60 struct ccase *sink_case; /* Case written to sink, if
61 compaction is necessary. */
62 size_t cases_written; /* Cases output so far. */
63 size_t cases_analyzed; /* Cases passed to procedure so far. */
66 /* The current active file, from which cases are read. */
67 struct case_source *vfm_source;
69 /* The replacement active file, to which cases are written. */
70 struct case_sink *vfm_sink;
72 /* Nonzero if the case needs to have values deleted before being
73 stored, zero otherwise. */
74 static int compaction_necessary;
76 /* Nonzero means that we've overflowed our allotted workspace.
77 After that happens once during a session, we always store the
78 active file on disk instead of in memory. (This policy may be
80 static int workspace_overflow = 0;
82 /* Time at which vfm was last invoked. */
83 time_t last_vfm_invocation;
86 int n_lag; /* Number of cases to lag. */
87 static int lag_count; /* Number of cases in lag_queue so far. */
88 static int lag_head; /* Index where next case will be added. */
89 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
91 static struct ccase *create_trns_case (struct dictionary *);
92 static void open_active_file (void);
93 static int write_case (struct write_case_data *wc_data);
94 static int execute_transformations (struct ccase *c,
95 struct trns_header **trns,
96 int first_idx, int last_idx,
98 static int filter_case (const struct ccase *c, int case_num);
99 static void lag_case (const struct ccase *c);
100 static void compact_case (struct ccase *dest, const struct ccase *src);
101 static void clear_case (struct ccase *c);
102 static void close_active_file (void);
104 /* Public functions. */
106 /* Reads the data from the input program and writes it to a new
107 active file. For each case we read from the input program, we
110 1. Execute permanent transformations. If these drop the case,
111 start the next case from step 1.
113 2. N OF CASES. If we have already written N cases, start the
114 next case from step 1.
116 3. Write case to replacement active file.
118 4. Execute temporary transformations. If these drop the case,
119 start the next case from step 1.
121 5. FILTER, PROCESS IF. If these drop the case, start the next
124 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
125 cases, start the next case from step 1.
127 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
129 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
131 static int recursive_call;
133 struct write_case_data wc_data;
135 assert (++recursive_call == 1);
137 wc_data.proc_func = proc_func;
139 wc_data.trns_case = create_trns_case (default_dict);
140 wc_data.sink_case = xmalloc (dict_get_case_size (default_dict));
141 wc_data.cases_written = 0;
143 last_vfm_invocation = time (NULL);
146 if (vfm_source != NULL)
147 vfm_source->class->read (vfm_source,
149 write_case, &wc_data);
150 close_active_file ();
152 free (wc_data.sink_case);
153 free (wc_data.trns_case);
155 assert (--recursive_call == 0);
158 /* Creates and returns a case, initializing it from the vectors
159 that say which `value's need to be initialized just once, and
160 which ones need to be re-initialized before every case. */
161 static struct ccase *
162 create_trns_case (struct dictionary *dict)
164 struct ccase *c = xmalloc (dict_get_case_size (dict));
165 size_t var_cnt = dict_get_var_cnt (dict);
168 for (i = 0; i < var_cnt; i++)
170 struct variable *v = dict_get_var (dict, i);
172 if (v->type == NUMERIC)
175 c->data[v->fv].f = 0.0;
177 c->data[v->fv].f = SYSMIS;
180 memset (c->data[v->fv].s, ' ', v->width);
185 /* Makes all preparations for reading from the data source and writing
188 open_active_file (void)
190 /* Make temp_dict refer to the dictionary right before data
195 temp_dict = default_dict;
198 /* Figure out compaction. */
199 compaction_necessary = (dict_get_next_value_idx (temp_dict)
200 != dict_get_compacted_value_cnt (temp_dict));
203 if (vfm_sink == NULL)
204 vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
205 if (vfm_sink->class->open != NULL)
206 vfm_sink->class->open (vfm_sink);
208 /* Allocate memory for lag queue. */
215 lag_queue = xmalloc (n_lag * sizeof *lag_queue);
216 for (i = 0; i < n_lag; i++)
217 lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
220 /* Close any unclosed DO IF or LOOP constructs. */
221 discard_ctl_stack ();
224 /* Transforms trns_case and writes it to the replacement active
225 file if advisable. Returns nonzero if more cases can be
226 accepted, zero otherwise. Do not call this function again
227 after it has returned zero once. */
229 write_case (struct write_case_data *wc_data)
231 /* Execute permanent transformations. */
232 if (!execute_transformations (wc_data->trns_case, t_trns, f_trns, temp_trns,
233 wc_data->cases_written + 1))
237 if (dict_get_case_limit (default_dict)
238 && wc_data->cases_written >= dict_get_case_limit (default_dict))
240 wc_data->cases_written++;
242 /* Write case to LAG queue. */
244 lag_case (wc_data->trns_case);
246 /* Write case to replacement active file. */
247 if (vfm_sink->class->write != NULL)
249 if (compaction_necessary)
251 compact_case (wc_data->sink_case, wc_data->trns_case);
252 vfm_sink->class->write (vfm_sink, wc_data->sink_case);
255 vfm_sink->class->write (vfm_sink, wc_data->trns_case);
258 /* Execute temporary transformations. */
259 if (!execute_transformations (wc_data->trns_case, t_trns, temp_trns, n_trns,
260 wc_data->cases_written))
263 /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
264 if (filter_case (wc_data->trns_case, wc_data->cases_written)
265 || (dict_get_case_limit (temp_dict)
266 && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
268 wc_data->cases_analyzed++;
270 /* Pass case to procedure. */
271 if (wc_data->proc_func != NULL)
272 wc_data->proc_func (wc_data->trns_case, wc_data->aux);
275 clear_case (wc_data->trns_case);
279 /* Transforms case C using the transformations in TRNS[] with
280 indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
281 become case CASE_NUM (1-based) in the output file. Returns
282 zero if the case was filtered out by one of the
283 transformations, nonzero otherwise. */
285 execute_transformations (struct ccase *c,
286 struct trns_header **trns,
287 int first_idx, int last_idx,
292 for (idx = first_idx; idx != last_idx; )
294 int retval = trns[idx]->proc (trns[idx], c, case_num);
313 /* Returns nonzero if case C with case number CASE_NUM should be
314 exclude as specified on FILTER or PROCESS IF, otherwise
317 filter_case (const struct ccase *c, int case_num)
320 struct variable *filter_var = dict_get_filter (default_dict);
321 if (filter_var != NULL)
323 double f = c->data[filter_var->fv].f;
324 if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
329 if (process_if_expr != NULL
330 && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
336 /* Add C to the lag queue. */
338 lag_case (const struct ccase *c)
340 if (lag_count < n_lag)
342 memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
343 if (++lag_head >= n_lag)
347 /* Copies case SRC to case DEST, compacting it in the process. */
349 compact_case (struct ccase *dest, const struct ccase *src)
355 assert (compaction_necessary);
357 /* Copy all the variables except scratch variables from SRC to
359 var_cnt = dict_get_var_cnt (default_dict);
360 for (i = 0; i < var_cnt; i++)
362 struct variable *v = dict_get_var (default_dict, i);
364 if (dict_class_from_id (v->name) == DC_SCRATCH)
367 if (v->type == NUMERIC)
368 dest->data[nval++] = src->data[v->fv];
371 int w = DIV_RND_UP (v->width, sizeof (union value));
373 memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
379 /* Clears the variables in C that need to be cleared between
382 clear_case (struct ccase *c)
384 size_t var_cnt = dict_get_var_cnt (default_dict);
387 for (i = 0; i < var_cnt; i++)
389 struct variable *v = dict_get_var (default_dict, i);
390 if (v->init && v->reinit)
392 if (v->type == NUMERIC)
393 c->data[v->fv].f = SYSMIS;
395 memset (c->data[v->fv].s, ' ', v->width);
400 /* Closes the active file. */
402 close_active_file (void)
404 /* Free memory for lag queue, and turn off lagging. */
409 for (i = 0; i < n_lag; i++)
415 /* Dictionary from before TEMPORARY becomes permanent.. */
418 dict_destroy (default_dict);
419 default_dict = temp_dict;
423 /* Finish compaction. */
424 if (compaction_necessary)
425 dict_compact_values (default_dict);
427 /* Free data source. */
428 if (vfm_source != NULL)
430 if (vfm_source->class->destroy != NULL)
431 vfm_source->class->destroy (vfm_source);
435 /* Old data sink becomes new data source. */
436 if (vfm_sink->class->make_source != NULL)
437 vfm_source = vfm_sink->class->make_source (vfm_sink);
440 if (vfm_sink->class->destroy != NULL)
441 vfm_sink->class->destroy (vfm_sink);
444 free_case_sink (vfm_sink);
447 /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
448 and get rid of all the transformations. */
450 expr_free (process_if_expr);
451 process_if_expr = NULL;
452 if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
453 dict_set_filter (default_dict, NULL);
454 dict_set_case_limit (default_dict, 0);
455 dict_clear_vectors (default_dict);
456 cancel_transformations ();
459 /* Storage case stream. */
461 /* Information about storage sink or source. */
462 struct storage_stream_info
464 size_t case_cnt; /* Number of cases. */
465 size_t case_size; /* Number of bytes in case. */
466 enum { DISK, MEMORY } mode; /* Where is data stored? */
469 FILE *file; /* Data file. */
471 /* Memory storage. */
472 int max_cases; /* Maximum cases before switching to disk. */
473 struct case_list *head; /* First case in list. */
474 struct case_list *tail; /* Last case in list. */
477 static void open_storage_file (struct storage_stream_info *info);
479 /* Initializes a storage sink. */
481 storage_sink_open (struct case_sink *sink)
483 struct storage_stream_info *info;
485 sink->aux = info = xmalloc (sizeof *info);
487 info->case_size = sink->value_cnt * sizeof (union value);
490 info->head = info->tail = NULL;
491 if (workspace_overflow)
494 open_storage_file (info);
499 info->max_cases = (get_max_workspace()
500 / (sizeof (struct case_list) + info->case_size));
504 /* Creates a new temporary file and puts it into INFO. */
506 open_storage_file (struct storage_stream_info *info)
508 info->file = tmpfile ();
509 if (info->file == NULL)
511 msg (ME, _("An error occurred creating a temporary "
512 "file for use as the active file: %s."),
518 /* Writes the VALUE_CNT values in VALUES to FILE. */
520 write_storage_file (FILE *file, const union value *values, size_t value_cnt)
522 if (fwrite (values, sizeof *values * value_cnt, 1, file) != 1)
524 msg (ME, _("An error occurred writing to a "
525 "temporary file used as the active file: %s."),
531 /* If INFO represents records in memory, moves them to disk.
532 Each comprises VALUE_CNT `union value's. */
534 storage_to_disk (struct storage_stream_info *info, size_t value_cnt)
536 struct case_list *cur, *next;
538 if (info->mode == MEMORY)
541 open_storage_file (info);
542 for (cur = info->head; cur; cur = next)
545 write_storage_file (info->file, cur->c.data, value_cnt);
548 info->head = info->tail = NULL;
552 /* Destroys storage stream represented by INFO. */
554 destroy_storage_stream_info (struct storage_stream_info *info)
556 if (info->mode == DISK)
558 if (info->file != NULL)
563 struct case_list *cur, *next;
565 for (cur = info->head; cur; cur = next)
574 /* Writes case C to the storage sink SINK. */
576 storage_sink_write (struct case_sink *sink, const struct ccase *c)
578 struct storage_stream_info *info = sink->aux;
581 if (info->mode == MEMORY)
583 struct case_list *new_case;
586 new_case = xmalloc (sizeof (struct case_list)
587 + ((sink->value_cnt - 1) * sizeof (union value)));
588 memcpy (&new_case->c, c, sizeof (union value) * sink->value_cnt);
590 /* Append case to linked list. */
591 new_case->next = NULL;
592 if (info->head != NULL)
593 info->tail->next = new_case;
595 info->head = new_case;
596 info->tail = new_case;
598 /* Dump all the cases to disk if we've run out of
600 if (info->case_cnt > info->max_cases)
602 workspace_overflow = 1;
603 msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
604 "overflowed. Writing active file to disk."),
605 get_max_workspace() / 1024, info->max_cases,
606 sizeof (struct case_list) + info->case_size);
608 storage_to_disk (info, sink->value_cnt);
612 write_storage_file (info->file, c->data, sink->value_cnt);
615 /* Destroys internal data in SINK. */
617 storage_sink_destroy (struct case_sink *sink)
619 destroy_storage_stream_info (sink->aux);
622 /* Closes and destroys the sink and returns a storage source to
623 read back the written data. */
624 static struct case_source *
625 storage_sink_make_source (struct case_sink *sink)
627 struct storage_stream_info *info = sink->aux;
629 if (info->mode == DISK)
631 /* Rewind the file. */
632 assert (info->file != NULL);
633 if (fseek (info->file, 0, SEEK_SET) != 0)
635 msg (ME, _("An error occurred while attempting to rewind a "
636 "temporary file used as the active file: %s."),
642 return create_case_source (&storage_source_class, sink->dict, info);
646 const struct case_sink_class storage_sink_class =
651 storage_sink_destroy,
652 storage_sink_make_source,
655 /* Storage source. */
657 /* Returns the number of cases that will be read by
658 storage_source_read(). */
660 storage_source_count (const struct case_source *source)
662 struct storage_stream_info *info = source->aux;
664 return info->case_cnt;
667 /* Reads all cases from the storage source and passes them one by one to
670 storage_source_read (struct case_source *source,
672 write_case_func *write_case, write_case_data wc_data)
674 struct storage_stream_info *info = source->aux;
676 if (info->mode == DISK)
680 for (i = 0; i < info->case_cnt; i++)
682 if (!fread (c, info->case_size, 1, info->file))
684 msg (ME, _("An error occurred while attempting to read from "
685 "a temporary file created for the active file: %s."),
691 if (!write_case (wc_data))
697 while (info->head != NULL)
699 struct case_list *iter = info->head;
700 memcpy (c, &iter->c, info->case_size);
701 if (!write_case (wc_data))
704 info->head = iter->next;
711 /* Destroys the source's internal data. */
713 storage_source_destroy (struct case_source *source)
715 destroy_storage_stream_info (source->aux);
718 /* Storage source. */
719 const struct case_source_class storage_source_class =
722 storage_source_count,
724 storage_source_destroy,
727 /* Returns nonzero only if SOURCE is stored on disk (instead of
730 storage_source_on_disk (const struct case_source *source)
732 struct storage_stream_info *info = source->aux;
734 return info->mode == DISK;
737 /* Returns the list of cases in storage source SOURCE. */
739 storage_source_get_cases (const struct case_source *source)
741 struct storage_stream_info *info = source->aux;
743 assert (info->mode == MEMORY);
747 /* Sets the list of cases in memory source SOURCE to CASES. */
749 storage_source_set_cases (const struct case_source *source,
750 struct case_list *cases)
752 struct storage_stream_info *info = source->aux;
754 assert (info->mode == MEMORY);
758 /* If SOURCE has its cases in memory, writes them to disk. */
760 storage_source_to_disk (struct case_source *source)
762 struct storage_stream_info *info = source->aux;
764 storage_to_disk (info, source->value_cnt);
767 /* Null sink. Used by a few procedures that keep track of output
768 themselves and would throw away anything that the sink
771 const struct case_sink_class null_sink_class =
780 /* Returns a pointer to the lagged case from N_BEFORE cases before the
781 current one, or NULL if there haven't been that many cases yet. */
783 lagged_case (int n_before)
785 assert (n_before <= n_lag);
786 if (n_before > lag_count)
790 int index = lag_head - n_before;
793 return lag_queue[index];
797 /* Appends TRNS to t_trns[], the list of all transformations to be
798 performed on data as it is read from the active file. */
800 add_transformation (struct trns_header * trns)
802 if (n_trns >= m_trns)
805 t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
807 t_trns[n_trns] = trns;
808 trns->index = n_trns++;
811 /* Cancels all active transformations, including any transformations
812 created by the input program. */
814 cancel_transformations (void)
817 for (i = 0; i < n_trns; i++)
820 t_trns[i]->free (t_trns[i]);
831 /* Creates a case source with class CLASS and auxiliary data AUX
832 and based on dictionary DICT. */
834 create_case_source (const struct case_source_class *class,
835 const struct dictionary *dict,
838 struct case_source *source = xmalloc (sizeof *source);
839 source->class = class;
840 source->value_cnt = dict_get_next_value_idx (dict);
845 /* Returns nonzero if a case source is "complex". */
847 case_source_is_complex (const struct case_source *source)
849 return source != NULL && (source->class == &input_program_source_class
850 || source->class == &file_type_source_class);
853 /* Returns nonzero if CLASS is the class of SOURCE. */
855 case_source_is_class (const struct case_source *source,
856 const struct case_source_class *class)
858 return source != NULL && source->class == class;
861 /* Creates a case sink with class CLASS and auxiliary data
864 create_case_sink (const struct case_sink_class *class,
865 const struct dictionary *dict,
868 struct case_sink *sink = xmalloc (sizeof *sink);
871 sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
872 sink->value_cnt = dict_get_compacted_value_cnt (dict);
877 /* Destroys case sink SINK. It is the caller's responsible to
878 call the sink's destroy function, if any. */
880 free_case_sink (struct case_sink *sink)
882 free (sink->idx_to_fv);
886 /* Represents auxiliary data for handling SPLIT FILE. */
887 struct split_aux_data
889 size_t case_count; /* Number of cases so far. */
890 struct ccase *prev_case; /* Data in previous case. */
892 /* Functions to call... */
893 void (*begin_func) (void *); /* ...before data. */
894 int (*proc_func) (struct ccase *, void *); /* ...with data. */
895 void (*end_func) (void *); /* ...after data. */
896 void *func_aux; /* Auxiliary data. */
899 static int equal_splits (const struct ccase *, const struct ccase *);
900 static int procedure_with_splits_callback (struct ccase *, void *);
901 static void dump_splits (struct ccase *);
903 /* Like procedure(), but it automatically breaks the case stream
904 into SPLIT FILE break groups. Before each group of cases with
905 identical SPLIT FILE variable values, BEGIN_FUNC is called.
906 Then PROC_FUNC is called with each case in the group.
907 END_FUNC is called when the group is finished. FUNC_AUX is
908 passed to each of the functions as auxiliary data.
910 If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
911 and END_FUNC will be called at all.
913 If SPLIT FILE is not in effect, then there is one break group
914 (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
915 will be called once. */
917 procedure_with_splits (void (*begin_func) (void *aux),
918 int (*proc_func) (struct ccase *, void *aux),
919 void (*end_func) (void *aux),
922 struct split_aux_data split_aux;
924 split_aux.case_count = 0;
925 split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
926 split_aux.begin_func = begin_func;
927 split_aux.proc_func = proc_func;
928 split_aux.end_func = end_func;
929 split_aux.func_aux = func_aux;
931 procedure (procedure_with_splits_callback, &split_aux);
933 if (split_aux.case_count > 0 && end_func != NULL)
935 free (split_aux.prev_case);
938 /* procedure() callback used by procedure_with_splits(). */
940 procedure_with_splits_callback (struct ccase *c, void *split_aux_)
942 struct split_aux_data *split_aux = split_aux_;
944 /* Start a new series if needed. */
945 if (split_aux->case_count == 0
946 || !equal_splits (c, split_aux->prev_case))
948 if (split_aux->case_count > 0 && split_aux->end_func != NULL)
949 split_aux->end_func (split_aux->func_aux);
952 memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
954 if (split_aux->begin_func != NULL)
955 split_aux->begin_func (split_aux->func_aux);
958 split_aux->case_count++;
959 if (split_aux->proc_func != NULL)
960 return split_aux->proc_func (c, split_aux->func_aux);
965 /* Compares the SPLIT FILE variables in cases A and B and returns
966 nonzero only if they differ. */
968 equal_splits (const struct ccase *a, const struct ccase *b)
970 struct variable *const *split;
974 split = dict_get_split_vars (default_dict);
975 split_cnt = dict_get_split_cnt (default_dict);
976 for (i = 0; i < split_cnt; i++)
978 struct variable *v = split[i];
983 if (a->data[v->fv].f != b->data[v->fv].f)
987 if (memcmp (a->data[v->fv].s, b->data[v->fv].s, v->width))
998 /* Dumps out the values of all the split variables for the case C. */
1000 dump_splits (struct ccase *c)
1002 struct variable *const *split;
1003 struct tab_table *t;
1007 split_cnt = dict_get_split_cnt (default_dict);
1011 t = tab_create (3, split_cnt + 1, 0);
1012 tab_dim (t, tab_natural_dimensions);
1013 tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
1014 tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
1015 tab_text (t, 0, 0, TAB_NONE, _("Variable"));
1016 tab_text (t, 1, 0, TAB_LEFT, _("Value"));
1017 tab_text (t, 2, 0, TAB_LEFT, _("Label"));
1018 split = dict_get_split_vars (default_dict);
1019 for (i = 0; i < split_cnt; i++)
1021 struct variable *v = split[i];
1023 const char *val_lab;
1025 assert (v->type == NUMERIC || v->type == ALPHA);
1026 tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
1028 data_out (temp_buf, &v->print, &c->data[v->fv]);
1030 temp_buf[v->print.w] = 0;
1031 tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
1033 val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
1035 tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
1037 tab_flags (t, SOMF_NO_TITLE);