eb8d65deee739b7d9e7269844e490bd3329d5ca9
[pspp] / src / data / casegrouper.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2011, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/casegrouper.h"
20
21 #include <stdlib.h>
22
23 #include "data/casereader.h"
24 #include "data/casewriter.h"
25 #include "data/dictionary.h"
26 #include "data/subcase.h"
27 #include "libpspp/taint.h"
28
29 #include "gl/xalloc.h"
30
31 /* A casegrouper. */
32 struct casegrouper
33   {
34     struct casereader *reader;  /* Source of input cases. */
35     struct taint *taint;        /* Error status for casegrouper. */
36
37     /* Functions for grouping cases. */
38     bool (*same_group) (const struct ccase *, const struct ccase *, void *aux);
39     void (*destroy) (void *aux);
40     void *aux;
41   };
42
43 /* Creates and returns a new casegrouper that takes its input
44    from READER.  SAME_GROUP is used to decide which cases are in
45    a group: it returns true if the pair of cases provided are in
46    the same group, false otherwise.  DESTROY will be called when
47    the casegrouper is destroyed and should free any storage
48    needed by SAME_GROUP.
49
50    Takes ownerhip of READER.
51
52    SAME_GROUP may be a null pointer.  If so, READER's entire
53    contents is considered to be a single group. */
54 struct casegrouper *
55 casegrouper_create_func (struct casereader *reader,
56                          bool (*same_group) (const struct ccase *,
57                                              const struct ccase *,
58                                              void *aux),
59                          void (*destroy) (void *aux),
60                          void *aux)
61 {
62   struct casegrouper *grouper = xmalloc (sizeof *grouper);
63   grouper->reader = casereader_rename (reader);
64   grouper->taint = taint_clone (casereader_get_taint (grouper->reader));
65   grouper->same_group = same_group;
66   grouper->destroy = destroy;
67   grouper->aux = aux;
68   return grouper;
69 }
70
71 /* Obtains the next group of cases from GROUPER.  Returns true if
72    successful, false if no groups remain.  If successful, *READER
73    is set to the casereader for the new group; otherwise, it is
74    set to NULL. */
75 bool
76 casegrouper_get_next_group (struct casegrouper *grouper,
77                             struct casereader **reader)
78 {
79   /* FIXME: we really shouldn't need a temporary casewriter for
80      the common case where we read an entire group's data before
81      going on to the next. */
82   if (grouper->same_group != NULL)
83     {
84       struct casewriter *writer;
85       struct ccase *group_case, *tmp;
86
87       group_case = casereader_read (grouper->reader);
88       if (group_case == NULL)
89         {
90           *reader = NULL;
91           return false;
92         }
93
94       writer = autopaging_writer_create (
95         casereader_get_proto (grouper->reader));
96
97       casewriter_write (writer, case_ref (group_case));
98
99       while ((tmp = casereader_peek (grouper->reader, 0)) != NULL
100              && grouper->same_group (group_case, tmp, grouper->aux))
101         {
102           case_unref (casereader_read (grouper->reader));
103           casewriter_write (writer, tmp);
104         }
105       case_unref (tmp);
106       case_unref (group_case);
107
108       *reader = casewriter_make_reader (writer);
109       return true;
110     }
111   else
112     {
113       if (grouper->reader != NULL)
114         {
115           if (!casereader_is_empty (grouper->reader))
116             {
117               *reader = grouper->reader;
118               grouper->reader = NULL;
119               return true;
120             }
121           else
122             {
123               casereader_destroy (grouper->reader);
124               grouper->reader = NULL;
125               return false;
126             }
127         }
128       else
129         {
130           *reader = NULL;
131           return false;
132         }
133     }
134 }
135
136 /* Destroys GROUPER.  Returns false if GROUPER's input casereader
137    or any state derived from it had become tainted, which means
138    that an I/O error or other serious error occurred in
139    processing data derived from GROUPER; otherwise, return true. */
140 bool
141 casegrouper_destroy (struct casegrouper *grouper)
142 {
143   if (grouper != NULL)
144     {
145       struct taint *taint = grouper->taint;
146       bool ok;
147
148       casereader_destroy (grouper->reader);
149       if (grouper->destroy != NULL)
150         grouper->destroy (grouper->aux);
151       free (grouper);
152
153       ok = !taint_has_tainted_successor (taint);
154       taint_destroy (taint);
155       return ok;
156     }
157   else
158     return true;
159 }
160 \f
161 /* Casegrouper based on equal values of variables from case to
162    case. */
163
164 static bool casegrouper_vars_same_group (const struct ccase *,
165                                          const struct ccase *,
166                                          void *);
167 static void casegrouper_vars_destroy (void *);
168
169 /* Creates and returns a casegrouper that reads data from READER
170    and breaks it into contiguous groups of cases that have equal
171    values for the VAR_CNT variables in VARS.  If VAR_CNT is 0,
172    then all the cases will be put in a single group.
173
174    Takes ownerhip of READER. */
175 struct casegrouper *
176 casegrouper_create_vars (struct casereader *reader,
177                          const struct variable *const *vars,
178                          size_t var_cnt)
179 {
180   if (var_cnt > 0)
181     {
182       struct subcase *sc = xmalloc (sizeof *sc);
183       subcase_init_vars (sc, vars, var_cnt);
184       return casegrouper_create_func (reader, casegrouper_vars_same_group,
185                                       casegrouper_vars_destroy, sc);
186     }
187   else
188     return casegrouper_create_func (reader, NULL, NULL, NULL);
189 }
190
191 /* Creates and returns a casegrouper that reads data from READER
192    and breaks it into contiguous groups of cases that have equal
193    values for the SPLIT FILE variables in DICT.  If DICT has no
194    SPLIT FILE variables, then all the cases will be put into a
195    single group.
196
197    Takes ownerhip of READER. */
198 struct casegrouper *
199 casegrouper_create_splits (struct casereader *reader,
200                            const struct dictionary *dict)
201 {
202   return casegrouper_create_vars (reader,
203                                   dict_get_split_vars (dict),
204                                   dict_get_split_cnt (dict));
205 }
206
207 /* Creates and returns a casegrouper that reads data from READER
208    and breaks it into contiguous groups of cases that have equal
209    values for the variables used for sorting in SC.  If SC is
210    empty (contains no fields), then all the cases will be put
211    into a single group.
212
213    Takes ownerhip of READER. */
214 struct casegrouper *
215 casegrouper_create_subcase (struct casereader *reader,
216                             const struct subcase *sc)
217 {
218   if (subcase_get_n_fields (sc) > 0)
219     {
220       struct subcase *sc_copy = xmalloc (sizeof *sc);
221       subcase_clone (sc_copy, sc);
222       return casegrouper_create_func (reader, casegrouper_vars_same_group,
223                                       casegrouper_vars_destroy, sc_copy);
224     }
225   else
226     return casegrouper_create_func (reader, NULL, NULL, NULL);
227 }
228
229 /* "same_group" function for an equal-variables casegrouper. */
230 static bool
231 casegrouper_vars_same_group (const struct ccase *a, const struct ccase *b,
232                              void *sc_)
233 {
234   struct subcase *sc = sc_;
235   return subcase_equal (sc, a, sc, b);
236 }
237
238 /* "destroy" for an equal-variables casegrouper. */
239 static void
240 casegrouper_vars_destroy (void *sc_)
241 {
242   struct subcase *sc = sc_;
243   if (sc != NULL)
244     {
245       subcase_destroy (sc);
246       free (sc);
247     }
248 }