86788ba7a3bc5afcbd280ad2be920685a48e76db
[pspp-builds.git] / src / data / casegrouper.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casegrouper.h>
20
21 #include <stdlib.h>
22
23 #include <data/casereader.h>
24 #include <data/casewriter.h>
25 #include <data/dictionary.h>
26 #include <data/subcase.h>
27 #include <libpspp/taint.h>
28
29 #include "xalloc.h"
30
31 /* A casegrouper. */
32 struct casegrouper
33   {
34     struct casereader *reader;  /* Source of input cases. */
35     struct taint *taint;        /* Error status for casegrouper. */
36
37     /* Functions for grouping cases. */
38     bool (*same_group) (const struct ccase *, const struct ccase *, void *aux);
39     void (*destroy) (void *aux);
40     void *aux;
41   };
42
43 /* Creates and returns a new casegrouper that takes its input
44    from READER.  SAME_GROUP is used to decide which cases are in
45    a group: it returns true if the pair of cases provided are in
46    the same group, false otherwise.  DESTROY will be called when
47    the casegrouper is destroyed and should free any storage
48    needed by SAME_GROUP.
49
50    SAME_GROUP may be a null pointer.  If so, READER's entire
51    contents is considered to be a single group. */
52 struct casegrouper *
53 casegrouper_create_func (struct casereader *reader,
54                          bool (*same_group) (const struct ccase *,
55                                              const struct ccase *,
56                                              void *aux),
57                          void (*destroy) (void *aux),
58                          void *aux)
59 {
60   struct casegrouper *grouper = xmalloc (sizeof *grouper);
61   grouper->reader = casereader_rename (reader);
62   grouper->taint = taint_clone (casereader_get_taint (grouper->reader));
63   grouper->same_group = same_group;
64   grouper->destroy = destroy;
65   grouper->aux = aux;
66   return grouper;
67 }
68
69 /* Obtains the next group of cases from GROUPER.  Returns true if
70    successful, false if no groups remain.  If successful, *READER
71    is set to the casereader for the new group; otherwise, it is
72    set to NULL. */
73 bool
74 casegrouper_get_next_group (struct casegrouper *grouper,
75                             struct casereader **reader)
76 {
77   /* FIXME: we really shouldn't need a temporary casewriter for
78      the common case where we read an entire group's data before
79      going on to the next. */
80   if (grouper->same_group != NULL)
81     {
82       struct casewriter *writer;
83       struct ccase *group_case, *tmp;
84
85       group_case = casereader_read (grouper->reader);
86       if (group_case == NULL)
87         {
88           *reader = NULL;
89           return false;
90         }
91
92       writer = autopaging_writer_create (casereader_get_value_cnt (grouper->reader));
93       case_ref (group_case);
94       casewriter_write (writer, group_case);
95
96       while ((tmp = casereader_peek (grouper->reader, 0)) != NULL
97              && grouper->same_group (group_case, tmp, grouper->aux))
98         {
99           case_unref (casereader_read (grouper->reader));
100           casewriter_write (writer, tmp);
101         }
102       case_unref (group_case);
103
104       *reader = casewriter_make_reader (writer);
105       return true;
106     }
107   else
108     {
109       if (grouper->reader != NULL)
110         {
111           if (!casereader_is_empty (grouper->reader))
112             {
113               *reader = grouper->reader;
114               grouper->reader = NULL;
115               return true;
116             }
117           else
118             {
119               casereader_destroy (grouper->reader);
120               grouper->reader = NULL;
121               return false;
122             }
123         }
124       else
125         {
126           *reader = NULL;
127           return false;
128         }
129     }
130 }
131
132 /* Destroys GROUPER.  Returns false if GROUPER's input casereader
133    or any state derived from it had become tainted, which means
134    that an I/O error or other serious error occurred in
135    processing data derived from GROUPER; otherwise, return true. */
136 bool
137 casegrouper_destroy (struct casegrouper *grouper)
138 {
139   if (grouper != NULL)
140     {
141       struct taint *taint = grouper->taint;
142       bool ok;
143
144       casereader_destroy (grouper->reader);
145       if (grouper->destroy != NULL)
146         grouper->destroy (grouper->aux);
147       free (grouper);
148
149       ok = !taint_has_tainted_successor (taint);
150       taint_destroy (taint);
151       return ok;
152     }
153   else
154     return true;
155 }
156 \f
157 /* Casegrouper based on equal values of variables from case to
158    case. */
159
160 static bool casegrouper_vars_same_group (const struct ccase *,
161                                          const struct ccase *,
162                                          void *);
163 static void casegrouper_vars_destroy (void *);
164
165 /* Creates and returns a casegrouper that reads data from READER
166    and breaks it into contiguous groups of cases that have equal
167    values for the VAR_CNT variables in VARS.  If VAR_CNT is 0,
168    then all the cases will be put in a single group. */
169 struct casegrouper *
170 casegrouper_create_vars (struct casereader *reader,
171                          const struct variable *const *vars,
172                          size_t var_cnt)
173 {
174   if (var_cnt > 0) 
175     {
176       struct subcase *sc = xmalloc (sizeof *sc);
177       subcase_init_vars (sc, vars, var_cnt);
178       return casegrouper_create_func (reader, casegrouper_vars_same_group,
179                                       casegrouper_vars_destroy, sc); 
180     }
181   else
182     return casegrouper_create_func (reader, NULL, NULL, NULL);
183 }
184
185 /* Creates and returns a casegrouper that reads data from READER
186    and breaks it into contiguous groups of cases that have equal
187    values for the SPLIT FILE variables in DICT.  If DICT has no
188    SPLIT FILE variables, then all the cases will be put into a
189    single group. */
190 struct casegrouper *
191 casegrouper_create_splits (struct casereader *reader,
192                            const struct dictionary *dict)
193 {
194   return casegrouper_create_vars (reader,
195                                   dict_get_split_vars (dict),
196                                   dict_get_split_cnt (dict));
197 }
198
199 /* Creates and returns a casegrouper that reads data from READER
200    and breaks it into contiguous groups of cases that have equal
201    values for the variables used for sorting in SC.  If SC is
202    empty (contains no fields), then all the cases will be put
203    into a single group. */
204 struct casegrouper *
205 casegrouper_create_subcase (struct casereader *reader,
206                             const struct subcase *sc)
207 {
208   if (subcase_get_n_fields (sc) > 0) 
209     {
210       struct subcase *sc_copy = xmalloc (sizeof *sc);
211       subcase_clone (sc_copy, sc);
212       return casegrouper_create_func (reader, casegrouper_vars_same_group,
213                                       casegrouper_vars_destroy, sc_copy); 
214     }
215   else
216     return casegrouper_create_func (reader, NULL, NULL, NULL);
217 }
218
219 /* "same_group" function for an equal-variables casegrouper. */
220 static bool
221 casegrouper_vars_same_group (const struct ccase *a, const struct ccase *b,
222                              void *sc_)
223 {
224   struct subcase *sc = sc_;
225   return subcase_equal (sc, a, sc, b);
226 }
227
228 /* "destroy" for an equal-variables casegrouper. */
229 static void
230 casegrouper_vars_destroy (void *sc_)
231 {
232   struct subcase *sc = sc_;
233   if (sc != NULL) 
234     {
235       subcase_destroy (sc);
236       free (sc); 
237     }
238 }