Change "union value" to dynamically allocate long strings.
[pspp-builds.git] / src / data / casegrouper.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casegrouper.h>
20
21 #include <stdlib.h>
22
23 #include <data/casereader.h>
24 #include <data/casewriter.h>
25 #include <data/dictionary.h>
26 #include <data/subcase.h>
27 #include <libpspp/taint.h>
28
29 #include "xalloc.h"
30
31 /* A casegrouper. */
32 struct casegrouper
33   {
34     struct casereader *reader;  /* Source of input cases. */
35     struct taint *taint;        /* Error status for casegrouper. */
36
37     /* Functions for grouping cases. */
38     bool (*same_group) (const struct ccase *, const struct ccase *, void *aux);
39     void (*destroy) (void *aux);
40     void *aux;
41   };
42
43 /* Creates and returns a new casegrouper that takes its input
44    from READER.  SAME_GROUP is used to decide which cases are in
45    a group: it returns true if the pair of cases provided are in
46    the same group, false otherwise.  DESTROY will be called when
47    the casegrouper is destroyed and should free any storage
48    needed by SAME_GROUP.
49
50    SAME_GROUP may be a null pointer.  If so, READER's entire
51    contents is considered to be a single group. */
52 struct casegrouper *
53 casegrouper_create_func (struct casereader *reader,
54                          bool (*same_group) (const struct ccase *,
55                                              const struct ccase *,
56                                              void *aux),
57                          void (*destroy) (void *aux),
58                          void *aux)
59 {
60   struct casegrouper *grouper = xmalloc (sizeof *grouper);
61   grouper->reader = casereader_rename (reader);
62   grouper->taint = taint_clone (casereader_get_taint (grouper->reader));
63   grouper->same_group = same_group;
64   grouper->destroy = destroy;
65   grouper->aux = aux;
66   return grouper;
67 }
68
69 /* Obtains the next group of cases from GROUPER.  Returns true if
70    successful, false if no groups remain.  If successful, *READER
71    is set to the casereader for the new group; otherwise, it is
72    set to NULL. */
73 bool
74 casegrouper_get_next_group (struct casegrouper *grouper,
75                             struct casereader **reader)
76 {
77   /* FIXME: we really shouldn't need a temporary casewriter for
78      the common case where we read an entire group's data before
79      going on to the next. */
80   if (grouper->same_group != NULL)
81     {
82       struct casewriter *writer;
83       struct ccase *group_case, *tmp;
84
85       group_case = casereader_read (grouper->reader);
86       if (group_case == NULL)
87         {
88           *reader = NULL;
89           return false;
90         }
91
92       writer = autopaging_writer_create (
93         casereader_get_proto (grouper->reader));
94       case_ref (group_case);
95       casewriter_write (writer, group_case);
96
97       while ((tmp = casereader_peek (grouper->reader, 0)) != NULL
98              && grouper->same_group (group_case, tmp, grouper->aux))
99         {
100           case_unref (casereader_read (grouper->reader));
101           casewriter_write (writer, tmp);
102         }
103       case_unref (group_case);
104
105       *reader = casewriter_make_reader (writer);
106       return true;
107     }
108   else
109     {
110       if (grouper->reader != NULL)
111         {
112           if (!casereader_is_empty (grouper->reader))
113             {
114               *reader = grouper->reader;
115               grouper->reader = NULL;
116               return true;
117             }
118           else
119             {
120               casereader_destroy (grouper->reader);
121               grouper->reader = NULL;
122               return false;
123             }
124         }
125       else
126         {
127           *reader = NULL;
128           return false;
129         }
130     }
131 }
132
133 /* Destroys GROUPER.  Returns false if GROUPER's input casereader
134    or any state derived from it had become tainted, which means
135    that an I/O error or other serious error occurred in
136    processing data derived from GROUPER; otherwise, return true. */
137 bool
138 casegrouper_destroy (struct casegrouper *grouper)
139 {
140   if (grouper != NULL)
141     {
142       struct taint *taint = grouper->taint;
143       bool ok;
144
145       casereader_destroy (grouper->reader);
146       if (grouper->destroy != NULL)
147         grouper->destroy (grouper->aux);
148       free (grouper);
149
150       ok = !taint_has_tainted_successor (taint);
151       taint_destroy (taint);
152       return ok;
153     }
154   else
155     return true;
156 }
157 \f
158 /* Casegrouper based on equal values of variables from case to
159    case. */
160
161 static bool casegrouper_vars_same_group (const struct ccase *,
162                                          const struct ccase *,
163                                          void *);
164 static void casegrouper_vars_destroy (void *);
165
166 /* Creates and returns a casegrouper that reads data from READER
167    and breaks it into contiguous groups of cases that have equal
168    values for the VAR_CNT variables in VARS.  If VAR_CNT is 0,
169    then all the cases will be put in a single group. */
170 struct casegrouper *
171 casegrouper_create_vars (struct casereader *reader,
172                          const struct variable *const *vars,
173                          size_t var_cnt)
174 {
175   if (var_cnt > 0) 
176     {
177       struct subcase *sc = xmalloc (sizeof *sc);
178       subcase_init_vars (sc, vars, var_cnt);
179       return casegrouper_create_func (reader, casegrouper_vars_same_group,
180                                       casegrouper_vars_destroy, sc); 
181     }
182   else
183     return casegrouper_create_func (reader, NULL, NULL, NULL);
184 }
185
186 /* Creates and returns a casegrouper that reads data from READER
187    and breaks it into contiguous groups of cases that have equal
188    values for the SPLIT FILE variables in DICT.  If DICT has no
189    SPLIT FILE variables, then all the cases will be put into a
190    single group. */
191 struct casegrouper *
192 casegrouper_create_splits (struct casereader *reader,
193                            const struct dictionary *dict)
194 {
195   return casegrouper_create_vars (reader,
196                                   dict_get_split_vars (dict),
197                                   dict_get_split_cnt (dict));
198 }
199
200 /* Creates and returns a casegrouper that reads data from READER
201    and breaks it into contiguous groups of cases that have equal
202    values for the variables used for sorting in SC.  If SC is
203    empty (contains no fields), then all the cases will be put
204    into a single group. */
205 struct casegrouper *
206 casegrouper_create_subcase (struct casereader *reader,
207                             const struct subcase *sc)
208 {
209   if (subcase_get_n_fields (sc) > 0) 
210     {
211       struct subcase *sc_copy = xmalloc (sizeof *sc);
212       subcase_clone (sc_copy, sc);
213       return casegrouper_create_func (reader, casegrouper_vars_same_group,
214                                       casegrouper_vars_destroy, sc_copy); 
215     }
216   else
217     return casegrouper_create_func (reader, NULL, NULL, NULL);
218 }
219
220 /* "same_group" function for an equal-variables casegrouper. */
221 static bool
222 casegrouper_vars_same_group (const struct ccase *a, const struct ccase *b,
223                              void *sc_)
224 {
225   struct subcase *sc = sc_;
226   return subcase_equal (sc, a, sc, b);
227 }
228
229 /* "destroy" for an equal-variables casegrouper. */
230 static void
231 casegrouper_vars_destroy (void *sc_)
232 {
233   struct subcase *sc = sc_;
234   if (sc != NULL) 
235     {
236       subcase_destroy (sc);
237       free (sc); 
238     }
239 }