Test both compressed and uncompressed system files with very long
[pspp] / src / data / casegrouper.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casegrouper.h>
20
21 #include <stdlib.h>
22
23 #include <data/case-ordering.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <libpspp/taint.h>
28
29 #include "xalloc.h"
30
31 /* A casegrouper. */
32 struct casegrouper
33   {
34     struct casereader *reader;  /* Source of input cases. */
35     struct taint *taint;        /* Error status for casegrouper. */
36
37     /* Functions for grouping cases. */
38     bool (*same_group) (const struct ccase *, const struct ccase *, void *aux);
39     void (*destroy) (void *aux);
40     void *aux;
41   };
42
43 /* Creates and returns a new casegrouper that takes its input
44    from READER.  SAME_GROUP is used to decide which cases are in
45    a group: it returns true if the pair of cases provided are in
46    the same group, false otherwise.  DESTROY will be called when
47    the casegrouper is destroyed and should free any storage
48    needed by SAME_GROUP.
49
50    SAME_GROUP may be a null pointer.  If so, READER's entire
51    contents is considered to be a single group. */
52 struct casegrouper *
53 casegrouper_create_func (struct casereader *reader,
54                          bool (*same_group) (const struct ccase *,
55                                              const struct ccase *,
56                                              void *aux),
57                          void (*destroy) (void *aux),
58                          void *aux)
59 {
60   struct casegrouper *grouper = xmalloc (sizeof *grouper);
61   grouper->reader = casereader_rename (reader);
62   grouper->taint = taint_clone (casereader_get_taint (grouper->reader));
63   grouper->same_group = same_group;
64   grouper->destroy = destroy;
65   grouper->aux = aux;
66   return grouper;
67 }
68
69 /* Obtains the next group of cases from GROUPER.  Returns true if
70    successful, false if no groups remain.  If successful, *READER
71    is set to the casereader for the new group; otherwise, it is
72    set to NULL. */
73 bool
74 casegrouper_get_next_group (struct casegrouper *grouper,
75                             struct casereader **reader)
76 {
77   /* FIXME: we really shouldn't need a temporary casewriter for
78      the common case where we read an entire group's data before
79      going on to the next. */
80   if (grouper->same_group != NULL)
81     {
82       struct casewriter *writer;
83       struct ccase group_case, tmp;
84
85       if (!casereader_read (grouper->reader, &group_case))
86         {
87           *reader = NULL;
88           return false;
89         }
90
91       writer = autopaging_writer_create (casereader_get_value_cnt (grouper->reader));
92       case_clone (&tmp, &group_case);
93       casewriter_write (writer, &tmp);
94
95       while (casereader_peek (grouper->reader, 0, &tmp)
96              && grouper->same_group (&group_case, &tmp, grouper->aux))
97         {
98           struct ccase discard;
99           casereader_read (grouper->reader, &discard);
100           case_destroy (&discard);
101           casewriter_write (writer, &tmp);
102         }
103       case_destroy (&tmp);
104       case_destroy (&group_case);
105
106       *reader = casewriter_make_reader (writer);
107       return true;
108     }
109   else
110     {
111       if (grouper->reader != NULL)
112         {
113           *reader = grouper->reader;
114           grouper->reader = NULL;
115           return true;
116         }
117       else
118         {
119           *reader = NULL;
120           return false;
121         }
122     }
123 }
124
125 /* Destroys GROUPER.  Returns false if GROUPER's input casereader
126    or any state derived from it had become tainted, which means
127    that an I/O error or other serious error occurred in
128    processing data derived from GROUPER; otherwise, return true. */
129 bool
130 casegrouper_destroy (struct casegrouper *grouper)
131 {
132   if (grouper != NULL)
133     {
134       struct taint *taint = grouper->taint;
135       bool ok;
136
137       casereader_destroy (grouper->reader);
138       if (grouper->destroy != NULL)
139         grouper->destroy (grouper->aux);
140       free (grouper);
141
142       ok = !taint_has_tainted_successor (taint);
143       taint_destroy (taint);
144       return ok;
145     }
146   else
147     return true;
148 }
149 \f
150 /* Casegrouper based on equal values of variables from case to
151    case. */
152
153 /* Casegrouper based on equal variables. */
154 struct casegrouper_vars
155   {
156     const struct variable **vars; /* Variables to compare. */
157     size_t var_cnt;               /* Number of variables. */
158   };
159
160 static bool casegrouper_vars_same_group (const struct ccase *,
161                                          const struct ccase *,
162                                          void *);
163 static void casegrouper_vars_destroy (void *);
164
165 /* Creates and returns a casegrouper that reads data from READER
166    and breaks it into contiguous groups of cases that have equal
167    values for the VAR_CNT variables in VARS.  If VAR_CNT is 0,
168    then all the cases will be put in a single group. */
169 struct casegrouper *
170 casegrouper_create_vars (struct casereader *reader,
171                          const struct variable *const *vars,
172                          size_t var_cnt)
173 {
174   if (var_cnt > 0)
175     {
176       struct casegrouper_vars *cv = xmalloc (sizeof *cv);
177       cv->vars = xmemdup (vars, sizeof *vars * var_cnt);
178       cv->var_cnt = var_cnt;
179       return casegrouper_create_func (reader,
180                                       casegrouper_vars_same_group,
181                                       casegrouper_vars_destroy,
182                                       cv);
183     }
184   else
185     return casegrouper_create_func (reader, NULL, NULL, NULL);
186 }
187
188 /* Creates and returns a casegrouper that reads data from READER
189    and breaks it into contiguous groups of cases that have equal
190    values for the SPLIT FILE variables in DICT.  If DICT has no
191    SPLIT FILE variables, then all the cases will be put into a
192    single group. */
193 struct casegrouper *
194 casegrouper_create_splits (struct casereader *reader,
195                            const struct dictionary *dict)
196 {
197   return casegrouper_create_vars (reader,
198                                   dict_get_split_vars (dict),
199                                   dict_get_split_cnt (dict));
200 }
201
202 /* Creates and returns a casegrouper that reads data from READER
203    and breaks it into contiguous groups of cases that have equal
204    values for the variables used for sorting in CO.  If CO is
205    empty (contains no sort keys), then all the cases will be put
206    into a single group. */
207 struct casegrouper *
208 casegrouper_create_case_ordering (struct casereader *reader,
209                                   const struct case_ordering *co)
210 {
211   const struct variable **vars;
212   size_t var_cnt;
213   struct casegrouper *grouper;
214
215   case_ordering_get_vars (co, &vars, &var_cnt);
216   grouper = casegrouper_create_vars (reader, vars, var_cnt);
217   free (vars);
218
219   return grouper;
220 }
221
222 /* "same_group" function for an equal-variables casegrouper. */
223 static bool
224 casegrouper_vars_same_group (const struct ccase *a, const struct ccase *b,
225                              void *cv_)
226 {
227   struct casegrouper_vars *cv = cv_;
228   return case_compare (a, b, cv->vars, cv->var_cnt) == 0;
229 }
230
231 /* "destroy" for an equal-variables casegrouper. */
232 static void
233 casegrouper_vars_destroy (void *cv_)
234 {
235   struct casegrouper_vars *cv = cv_;
236   free (cv->vars);
237   free (cv);
238 }
239