8a392afbab8be62531b68c9f3890818e88df4630
[pspp-builds.git] / src / data / casegrouper.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <data/casegrouper.h>
22
23 #include <stdlib.h>
24
25 #include <data/case-ordering.h>
26 #include <data/casereader.h>
27 #include <data/casewriter.h>
28 #include <data/dictionary.h>
29 #include <libpspp/taint.h>
30
31 #include "xalloc.h"
32
33 /* A casegrouper. */
34 struct casegrouper
35   {
36     struct casereader *reader;  /* Source of input cases. */
37     struct taint *taint;        /* Error status for casegrouper. */
38
39     /* Functions for grouping cases. */
40     bool (*same_group) (const struct ccase *, const struct ccase *, void *aux);
41     void (*destroy) (void *aux);
42     void *aux;
43   };
44
45 /* Creates and returns a new casegrouper that takes its input
46    from READER.  SAME_GROUP is used to decide which cases are in
47    a group: it returns true if the pair of cases provided are in
48    the same group, false otherwise.  DESTROY will be called when
49    the casegrouper is destroyed and should free any storage
50    needed by SAME_GROUP.
51
52    SAME_GROUP may be a null pointer.  If so, READER's entire
53    contents is considered to be a single group. */
54 struct casegrouper *
55 casegrouper_create_func (struct casereader *reader,
56                          bool (*same_group) (const struct ccase *,
57                                              const struct ccase *,
58                                              void *aux),
59                          void (*destroy) (void *aux),
60                          void *aux)
61 {
62   struct casegrouper *grouper = xmalloc (sizeof *grouper);
63   grouper->reader = casereader_rename (reader);
64   grouper->taint = taint_clone (casereader_get_taint (grouper->reader));
65   grouper->same_group = same_group;
66   grouper->destroy = destroy;
67   grouper->aux = aux;
68   return grouper;
69 }
70
71 /* Obtains the next group of cases from GROUPER.  Returns true if
72    successful, false if no groups remain.  If successful, *READER
73    is set to the casereader for the new group; otherwise, it is
74    set to NULL. */
75 bool
76 casegrouper_get_next_group (struct casegrouper *grouper,
77                             struct casereader **reader)
78 {
79   /* FIXME: we really shouldn't need a temporary casewriter for
80      the common case where we read an entire group's data before
81      going on to the next. */
82   if (grouper->same_group != NULL)
83     {
84       struct casewriter *writer;
85       struct ccase group_case, tmp;
86
87       if (!casereader_read (grouper->reader, &group_case))
88         {
89           *reader = NULL;
90           return false;
91         }
92
93       writer = autopaging_writer_create (casereader_get_value_cnt (grouper->reader));
94       case_clone (&tmp, &group_case);
95       casewriter_write (writer, &tmp);
96
97       while (casereader_peek (grouper->reader, 0, &tmp)
98              && grouper->same_group (&group_case, &tmp, grouper->aux))
99         {
100           struct ccase discard;
101           casereader_read (grouper->reader, &discard);
102           case_destroy (&discard);
103           casewriter_write (writer, &tmp);
104         }
105       case_destroy (&tmp);
106       case_destroy (&group_case);
107
108       *reader = casewriter_make_reader (writer);
109       return true;
110     }
111   else
112     {
113       if (grouper->reader != NULL)
114         {
115           *reader = grouper->reader;
116           grouper->reader = NULL;
117           return true;
118         }
119       else
120         {
121           *reader = NULL;
122           return false;
123         }
124     }
125 }
126
127 /* Destroys GROUPER.  Returns false if GROUPER's input casereader
128    or any state derived from it had become tainted, which means
129    that an I/O error or other serious error occurred in
130    processing data derived from GROUPER; otherwise, return true. */
131 bool
132 casegrouper_destroy (struct casegrouper *grouper)
133 {
134   if (grouper != NULL)
135     {
136       struct taint *taint = grouper->taint;
137       bool ok;
138
139       casereader_destroy (grouper->reader);
140       if (grouper->destroy != NULL)
141         grouper->destroy (grouper->aux);
142       free (grouper);
143
144       ok = !taint_has_tainted_successor (taint);
145       taint_destroy (taint);
146       return ok;
147     }
148   else
149     return true;
150 }
151 \f
152 /* Casegrouper based on equal values of variables from case to
153    case. */
154
155 /* Casegrouper based on equal variables. */
156 struct casegrouper_vars
157   {
158     const struct variable **vars; /* Variables to compare. */
159     size_t var_cnt;               /* Number of variables. */
160   };
161
162 static bool casegrouper_vars_same_group (const struct ccase *,
163                                          const struct ccase *,
164                                          void *);
165 static void casegrouper_vars_destroy (void *);
166
167 /* Creates and returns a casegrouper that reads data from READER
168    and breaks it into contiguous groups of cases that have equal
169    values for the VAR_CNT variables in VARS.  If VAR_CNT is 0,
170    then all the cases will be put in a single group. */
171 struct casegrouper *
172 casegrouper_create_vars (struct casereader *reader,
173                          const struct variable *const *vars,
174                          size_t var_cnt)
175 {
176   if (var_cnt > 0)
177     {
178       struct casegrouper_vars *cv = xmalloc (sizeof *cv);
179       cv->vars = xmemdup (vars, sizeof *vars * var_cnt);
180       cv->var_cnt = var_cnt;
181       return casegrouper_create_func (reader,
182                                       casegrouper_vars_same_group,
183                                       casegrouper_vars_destroy,
184                                       cv);
185     }
186   else
187     return casegrouper_create_func (reader, NULL, NULL, NULL);
188 }
189
190 /* Creates and returns a casegrouper that reads data from READER
191    and breaks it into contiguous groups of cases that have equal
192    values for the SPLIT FILE variables in DICT.  If DICT has no
193    SPLIT FILE variables, then all the cases will be put into a
194    single group. */
195 struct casegrouper *
196 casegrouper_create_splits (struct casereader *reader,
197                            const struct dictionary *dict)
198 {
199   return casegrouper_create_vars (reader,
200                                   dict_get_split_vars (dict),
201                                   dict_get_split_cnt (dict));
202 }
203
204 /* Creates and returns a casegrouper that reads data from READER
205    and breaks it into contiguous groups of cases that have equal
206    values for the variables used for sorting in CO.  If CO is
207    empty (contains no sort keys), then all the cases will be put
208    into a single group. */
209 struct casegrouper *
210 casegrouper_create_case_ordering (struct casereader *reader,
211                                   const struct case_ordering *co)
212 {
213   const struct variable **vars;
214   size_t var_cnt;
215   struct casegrouper *grouper;
216
217   case_ordering_get_vars (co, &vars, &var_cnt);
218   grouper = casegrouper_create_vars (reader, vars, var_cnt);
219   free (vars);
220
221   return grouper;
222 }
223
224 /* "same_group" function for an equal-variables casegrouper. */
225 static bool
226 casegrouper_vars_same_group (const struct ccase *a, const struct ccase *b,
227                              void *cv_)
228 {
229   struct casegrouper_vars *cv = cv_;
230   return case_compare (a, b, cv->vars, cv->var_cnt) == 0;
231 }
232
233 /* "destroy" for an equal-variables casegrouper. */
234 static void
235 casegrouper_vars_destroy (void *cv_)
236 {
237   struct casegrouper_vars *cv = cv_;
238   free (cv->vars);
239   free (cv);
240 }
241