Change "union value" to dynamically allocate long strings.
[pspp-builds.git] / src / data / casereader-filter.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader.h>
20
21 #include <stdlib.h>
22
23 #include <data/casereader-provider.h>
24 #include <data/casewriter.h>
25 #include <data/variable.h>
26 #include <data/dictionary.h>
27 #include <libpspp/taint.h>
28 #include <libpspp/message.h>
29
30 #include "xalloc.h"
31
32 #include "gettext.h"
33 #define _(msgid) gettext (msgid)
34
35 /* A casereader that filters data coming from another
36    casereader. */
37 struct casereader_filter
38   {
39     struct casereader *subreader; /* The reader to filter. */
40     bool (*include) (const struct ccase *, void *aux);
41     bool (*destroy) (void *aux);
42     void *aux;
43     struct casewriter *exclude; /* Writer that gets filtered cases, or NULL. */
44   };
45
46 static const struct casereader_class casereader_filter_class;
47
48 /* Creates and returns a casereader whose content is a filtered
49    version of the data in SUBREADER.  Only the cases for which
50    INCLUDE returns true will appear in the returned casereader,
51    in the original order.
52
53    If EXCLUDE is non-null, then cases for which INCLUDE returns
54    false are written to EXCLUDE.  These cases will not
55    necessarily be fully written to EXCLUDE until the filtering casereader's
56    cases have been fully read or, if that never occurs, until the
57    filtering casereader is destroyed.
58
59    When the filtering casereader is destroyed, DESTROY will be
60    called to allow any state maintained by INCLUDE to be freed.
61
62    After this function is called, SUBREADER must not ever again
63    be referenced directly.  It will be destroyed automatically
64    when the filtering casereader is destroyed. */
65 struct casereader *
66 casereader_create_filter_func (struct casereader *subreader,
67                                bool (*include) (const struct ccase *,
68                                                 void *aux),
69                                bool (*destroy) (void *aux),
70                                void *aux,
71                                struct casewriter *exclude)
72 {
73   struct casereader_filter *filter = xmalloc (sizeof *filter);
74   struct casereader *reader;
75   filter->subreader = casereader_rename (subreader);
76   filter->include = include;
77   filter->destroy = destroy;
78   filter->aux = aux;
79   filter->exclude = exclude;
80   reader = casereader_create_sequential (
81     NULL, casereader_get_proto (filter->subreader), CASENUMBER_MAX,
82     &casereader_filter_class, filter);
83   taint_propagate (casereader_get_taint (filter->subreader),
84                    casereader_get_taint (reader));
85   return reader;
86 }
87
88 /* Internal read function for filtering casereader. */
89 static struct ccase *
90 casereader_filter_read (struct casereader *reader UNUSED, void *filter_)
91
92 {
93   struct casereader_filter *filter = filter_;
94   for (;;)
95     {
96       struct ccase *c = casereader_read (filter->subreader);
97       if (c == NULL)
98         return NULL;
99       else if (filter->include (c, filter->aux))
100         return c;
101       else if (filter->exclude != NULL)
102         casewriter_write (filter->exclude, c);
103       else
104         case_unref (c);
105     }
106 }
107
108 /* Internal destruction function for filtering casereader. */
109 static void
110 casereader_filter_destroy (struct casereader *reader, void *filter_)
111 {
112   struct casereader_filter *filter = filter_;
113
114   /* Make sure we've written everything to the excluded cases
115      casewriter, if there is one. */
116   if (filter->exclude != NULL)
117     {
118       struct ccase *c;
119       while ((c = casereader_read (filter->subreader)) != NULL)
120         if (filter->include (c, filter->aux))
121           case_unref (c);
122         else
123           casewriter_write (filter->exclude, c);
124     }
125
126   casereader_destroy (filter->subreader);
127   if (filter->destroy != NULL && !filter->destroy (filter->aux))
128     casereader_force_error (reader);
129   free (filter);
130 }
131
132 /* Filtering casereader class. */
133 static const struct casereader_class casereader_filter_class =
134   {
135     casereader_filter_read,
136     casereader_filter_destroy,
137
138     /* We could in fact delegate clone to the subreader, if the
139        filter function is required to have no memory and if we
140        added reference counting.  But it might be useful to have
141        filter functions with memory and in any case this would
142        require a little extra work. */
143     NULL,
144     NULL,
145   };
146
147 \f
148 /* Casereader for filtering valid weights. */
149
150 /* Weight-filtering data. */
151 struct casereader_filter_weight
152   {
153     const struct variable *weight_var; /* Weight variable. */
154     bool *warn_on_invalid;      /* Have we already issued an error? */
155     bool local_warn_on_invalid; /* warn_on_invalid might point here. */
156   };
157
158 static bool casereader_filter_weight_include (const struct ccase *, void *);
159 static bool casereader_filter_weight_destroy (void *);
160
161 /* Creates and returns a casereader that filters cases from
162    READER by valid weights, that is, any cases with user- or
163    system-missing, zero, or negative weights are dropped.  The
164    weight variable's information is taken from DICT.  If DICT
165    does not have a weight variable, then no cases are filtered
166    out.
167
168    When a case with an invalid weight is encountered,
169    *WARN_ON_INVALID is checked.  If it is true, then an error
170    message is issued and *WARN_ON_INVALID is set false.  If
171    WARN_ON_INVALID is a null pointer, then an internal bool that
172    is initially true is used instead of a caller-supplied bool.
173
174    If EXCLUDE is non-null, then dropped cases are written to
175    EXCLUDE.  These cases will not necessarily be fully written to
176    EXCLUDE until the filtering casereader's cases have been fully
177    read or, if that never occurs, until the filtering casereader
178    is destroyed.
179
180    After this function is called, READER must not ever again be
181    referenced directly.  It will be destroyed automatically when
182    the filtering casereader is destroyed. */
183 struct casereader *
184 casereader_create_filter_weight (struct casereader *reader,
185                                  const struct dictionary *dict,
186                                  bool *warn_on_invalid,
187                                  struct casewriter *exclude)
188 {
189   struct variable *weight_var = dict_get_weight (dict);
190   if (weight_var != NULL)
191     {
192       struct casereader_filter_weight *cfw = xmalloc (sizeof *cfw);
193       cfw->weight_var = weight_var;
194       cfw->warn_on_invalid = (warn_on_invalid
195                                ? warn_on_invalid
196                                : &cfw->local_warn_on_invalid);
197       cfw->local_warn_on_invalid = true;
198       reader = casereader_create_filter_func (reader,
199                                               casereader_filter_weight_include,
200                                               casereader_filter_weight_destroy,
201                                               cfw, exclude);
202     }
203   else
204     reader = casereader_rename (reader);
205   return reader;
206 }
207
208 /* Internal "include" function for weight-filtering
209    casereader. */
210 static bool
211 casereader_filter_weight_include (const struct ccase *c, void *cfw_)
212 {
213   struct casereader_filter_weight *cfw = cfw_;
214   double value = case_num (c, cfw->weight_var);
215   if (value >= 0.0 && !var_is_num_missing (cfw->weight_var, value, MV_ANY))
216     return true;
217   else
218     {
219       if (*cfw->warn_on_invalid)
220         {
221           msg (SW, _("At least one case in the data read had a weight value "
222                      "that was user-missing, system-missing, zero, or "
223                      "negative.  These case(s) were ignored."));
224           *cfw->warn_on_invalid = false;
225         }
226       return false;
227     }
228 }
229
230 /* Internal "destroy" function for weight-filtering
231    casereader. */
232 static bool
233 casereader_filter_weight_destroy (void *cfw_)
234 {
235   struct casereader_filter_weight *cfw = cfw_;
236   free (cfw);
237   return true;
238 }
239 \f
240 /* Casereader for filtering missing values. */
241
242 /* Missing-value filtering data. */
243 struct casereader_filter_missing
244   {
245     struct variable **vars;     /* Variables whose values to filter. */
246     size_t var_cnt;             /* Number of variables. */
247     enum mv_class class;        /* Types of missing values to filter. */
248     casenumber *n_missing;
249   };
250
251 static bool casereader_filter_missing_include (const struct ccase *, void *);
252 static bool casereader_filter_missing_destroy (void *);
253
254 /* Creates and returns a casereader that filters out cases from
255    READER that have a missing value in the given CLASS for any of
256    the VAR_CNT variables in VARS.  Only cases that have
257    non-missing values for all of these variables are passed
258    through.
259
260    Ownership of VARS is retained by the caller.
261
262    If EXCLUDE is non-null, then dropped cases are written to
263    EXCLUDE.  These cases will not necessarily be fully written to
264    EXCLUDE until the filtering casereader's cases have been fully
265    read or, if that never occurs, until the filtering casereader
266    is destroyed.
267
268    If N_MISSING is non-null, then after reading, it will be filled
269    with the totla number of dropped cases.
270
271    After this function is called, READER must not ever again
272    be referenced directly.  It will be destroyed automatically
273    when the filtering casereader is destroyed. */
274 struct casereader *
275 casereader_create_filter_missing (struct casereader *reader,
276                                   const struct variable **vars, size_t var_cnt,
277                                   enum mv_class class,
278                                   casenumber *n_missing,
279                                   struct casewriter *exclude)
280 {
281   if (var_cnt > 0 && class != MV_NEVER)
282     {
283       struct casereader_filter_missing *cfm = xmalloc (sizeof *cfm);
284       cfm->vars = xmemdup (vars, sizeof *vars * var_cnt);
285       cfm->var_cnt = var_cnt;
286       cfm->class = class;
287       cfm->n_missing = n_missing;
288       if (n_missing) *n_missing = 0;
289       return casereader_create_filter_func (reader,
290                                             casereader_filter_missing_include,
291                                             casereader_filter_missing_destroy,
292                                             cfm,
293                                             exclude);
294     }
295   else
296     return casereader_rename (reader);
297 }
298
299 /* Internal "include" function for missing value-filtering
300    casereader. */
301 static bool
302 casereader_filter_missing_include (const struct ccase *c, void *cfm_)
303 {
304   const struct casereader_filter_missing *cfm = cfm_;
305   size_t i;
306
307   for (i = 0; i < cfm->var_cnt; i++)
308     {
309       struct variable *var = cfm->vars[i];
310       const union value *value = case_data (c, var);
311       if (var_is_value_missing (var, value, cfm->class))
312         {
313           if ( cfm->n_missing )
314             (*cfm->n_missing)++;
315           return false;
316         }
317     }
318   return true;
319 }
320
321 /* Internal "destroy" function for missing value-filtering
322    casereader. */
323 static bool
324 casereader_filter_missing_destroy (void *cfm_)
325 {
326   struct casereader_filter_missing *cfm = cfm_;
327   free (cfm->vars);
328   free (cfm);
329   return true;
330 }
331 \f
332 /* Case-counting casereader. */
333
334 static bool casereader_counter_include (const struct ccase *, void *);
335
336 /* Creates and returns a new casereader that counts the number of
337    cases that have been read from it.  *COUNTER is initially set
338    to INITIAL_VALUE, then incremented by 1 each time a case is read.
339
340    Counting casereaders must be used very cautiously: if a
341    counting casereader is cloned or if the casereader_peek
342    function is used on it, then the counter's value can be higher
343    than expected because of the buffering that goes on behind the
344    scenes.
345
346    The counter is only incremented as cases are actually read
347    from the casereader.  In particular, if the casereader is
348    destroyed before all cases have been read from the casereader,
349    cases never read will not be included in the count.
350
351    After this function is called, READER must not ever again
352    be referenced directly.  It will be destroyed automatically
353    when the filtering casereader is destroyed. */
354 struct casereader *
355 casereader_create_counter (struct casereader *reader, casenumber *counter,
356                            casenumber initial_value)
357 {
358   *counter = initial_value;
359   return casereader_create_filter_func (reader, casereader_counter_include,
360                                         NULL, counter, NULL);
361 }
362
363 /* Internal "include" function for counting casereader. */
364 static bool
365 casereader_counter_include (const struct ccase *c UNUSED, void *counter_)
366 {
367   casenumber *counter = counter_;
368   ++*counter;
369   return true;
370 }