d2be9cfaad3b2a6a0653834dae5b173f0e93568a
[pspp-builds.git] / src / data / casewindow.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 /* This casewindow implementation in terms of an class interface
18    is undoubtedly a form of over-abstraction.  However, it works
19    and the extra abstraction seems to be harmless. */
20
21 #include <config.h>
22
23 #include <data/casewindow.h>
24
25 #include <stdlib.h>
26
27 #include <data/case-tmpfile.h>
28 #include <libpspp/assertion.h>
29 #include <libpspp/compiler.h>
30 #include <libpspp/deque.h>
31 #include <libpspp/taint.h>
32
33 #include "xalloc.h"
34
35 /* A queue of cases. */
36 struct casewindow
37   {
38     /* Common data. */
39     size_t value_cnt;             /* Number of values per case. */
40     casenumber max_in_core_cases; /* Max cases before dumping to disk. */
41     struct taint *taint;          /* Taint status. */
42
43     /* Implementation data. */
44     const struct casewindow_class *class;
45     void *aux;
46   };
47
48 /* Implementation of a casewindow. */
49 struct casewindow_class
50   {
51     void *(*create) (struct taint *, size_t value_cnt);
52     void (*destroy) (void *aux);
53     void (*push_head) (void *aux, struct ccase *);
54     void (*pop_tail) (void *aux, casenumber cnt);
55     struct ccase *(*get_case) (void *aux, casenumber ofs);
56     casenumber (*get_case_cnt) (const void *aux);
57   };
58
59 /* Classes. */
60 static const struct casewindow_class casewindow_memory_class;
61 static const struct casewindow_class casewindow_file_class;
62
63 /* Creates and returns a new casewindow using the given
64    parameters. */
65 static struct casewindow *
66 do_casewindow_create (struct taint *taint,
67                       size_t value_cnt, casenumber max_in_core_cases)
68 {
69   struct casewindow *cw = xmalloc (sizeof *cw);
70   cw->class = (max_in_core_cases
71               ? &casewindow_memory_class
72               : &casewindow_file_class);
73   cw->aux = cw->class->create (taint, value_cnt);
74   cw->value_cnt = value_cnt;
75   cw->max_in_core_cases = max_in_core_cases;
76   cw->taint = taint;
77   return cw;
78 }
79
80 /* Creates and returns a new casewindow for cases with VALUE_CNT
81    values each.  If the casewindow holds more than
82    MAX_IN_CORE_CASES cases at any time, its cases will be dumped
83    to disk; otherwise, its cases will be held in memory. */
84 struct casewindow *
85 casewindow_create (size_t value_cnt, casenumber max_in_core_cases)
86 {
87   return do_casewindow_create (taint_create (), value_cnt, max_in_core_cases);
88 }
89
90 /* Destroys casewindow CW.
91    Returns true if CW was tainted, which is caused by an I/O
92    error or by taint propagation to the casewindow. */
93 bool
94 casewindow_destroy (struct casewindow *cw)
95 {
96   bool ok = true;
97   if (cw != NULL)
98     {
99       cw->class->destroy (cw->aux);
100       ok = taint_destroy (cw->taint);
101       free (cw);
102     }
103   return ok;
104 }
105
106 /* Swaps the contents of casewindows A and B. */
107 static void
108 casewindow_swap (struct casewindow *a, struct casewindow *b)
109 {
110   struct casewindow tmp = *a;
111   *a = *b;
112   *b = tmp;
113 }
114
115 /* Dumps the contents of casewindow OLD to disk. */
116 static void
117 casewindow_to_disk (struct casewindow *old)
118 {
119   struct casewindow *new;
120   new = do_casewindow_create (taint_clone (old->taint), old->value_cnt, 0);
121   while (casewindow_get_case_cnt (old) > 0 && !casewindow_error (new))
122     {
123       struct ccase *c = casewindow_get_case (old, 0);
124       if (c == NULL)
125         break;
126       casewindow_pop_tail (old, 1);
127       casewindow_push_head (new, c);
128     }
129   casewindow_swap (old, new);
130   casewindow_destroy (new);
131 }
132
133 /* Pushes case C at the head of casewindow CW.
134    Case C becomes owned by the casewindow. */
135 void
136 casewindow_push_head (struct casewindow *cw, struct ccase *c)
137 {
138   if (!casewindow_error (cw))
139     {
140       cw->class->push_head (cw->aux, c);
141       if (!casewindow_error (cw))
142         {
143           casenumber case_cnt = cw->class->get_case_cnt (cw->aux);
144           if (case_cnt > cw->max_in_core_cases
145               && cw->class == &casewindow_memory_class)
146             casewindow_to_disk (cw);
147         }
148     }
149   else
150     case_unref (c);
151 }
152
153 /* Deletes CASE_CNT cases at the tail of casewindow CW. */
154 void
155 casewindow_pop_tail (struct casewindow *cw, casenumber case_cnt)
156 {
157   if (!casewindow_error (cw))
158     cw->class->pop_tail (cw->aux, case_cnt);
159 }
160
161 /* Returns the case that is CASE_IDX cases away from CW's tail
162    into C, or a null pointer on an I/O error or if CW is
163    otherwise tainted.  The caller must call case_unref() on the
164    returned case when it is no longer needed. */
165 struct ccase *
166 casewindow_get_case (const struct casewindow *cw_, casenumber case_idx)
167 {
168   struct casewindow *cw = (struct casewindow *) cw_;
169
170   assert (case_idx >= 0 && case_idx < casewindow_get_case_cnt (cw));
171   if (casewindow_error (cw))
172     return NULL;
173   return cw->class->get_case (cw->aux, case_idx);
174 }
175
176 /* Returns the number of cases in casewindow CW. */
177 casenumber
178 casewindow_get_case_cnt (const struct casewindow *cw)
179 {
180   return cw->class->get_case_cnt (cw->aux);
181 }
182
183 /* Returns the number of values per case in casewindow CW. */
184 size_t
185 casewindow_get_value_cnt (const struct casewindow *cw)
186 {
187   return cw->value_cnt;
188 }
189
190 /* Returns true if casewindow CW is tainted.
191    A casewindow is tainted by an I/O error or by taint
192    propagation to the casewindow. */
193 bool
194 casewindow_error (const struct casewindow *cw)
195 {
196   return taint_is_tainted (cw->taint);
197 }
198
199 /* Marks casewindow CW tainted. */
200 void
201 casewindow_force_error (struct casewindow *cw)
202 {
203   taint_set_taint (cw->taint);
204 }
205
206 /* Returns casewindow CW's taint object. */
207 const struct taint *
208 casewindow_get_taint (const struct casewindow *cw)
209 {
210   return cw->taint;
211 }
212 \f
213 /* In-memory casewindow data. */
214 struct casewindow_memory
215   {
216     struct deque deque;
217     struct ccase **cases;
218   };
219
220 static void *
221 casewindow_memory_create (struct taint *taint UNUSED, size_t value_cnt UNUSED)
222 {
223   struct casewindow_memory *cwm = xmalloc (sizeof *cwm);
224   cwm->cases = deque_init (&cwm->deque, 4, sizeof *cwm->cases);
225   return cwm;
226 }
227
228 static void
229 casewindow_memory_destroy (void *cwm_)
230 {
231   struct casewindow_memory *cwm = cwm_;
232   while (!deque_is_empty (&cwm->deque))
233     case_unref (cwm->cases[deque_pop_front (&cwm->deque)]);
234   free (cwm->cases);
235   free (cwm);
236 }
237
238 static void
239 casewindow_memory_push_head (void *cwm_, struct ccase *c)
240 {
241   struct casewindow_memory *cwm = cwm_;
242   if (deque_is_full (&cwm->deque))
243     cwm->cases = deque_expand (&cwm->deque, cwm->cases, sizeof *cwm->cases);
244   cwm->cases[deque_push_back (&cwm->deque)] = c;
245 }
246
247 static void
248 casewindow_memory_pop_tail (void *cwm_, casenumber case_cnt)
249 {
250   struct casewindow_memory *cwm = cwm_;
251   assert (deque_count (&cwm->deque) >= case_cnt);
252   while (case_cnt-- > 0)
253     case_unref (cwm->cases[deque_pop_front (&cwm->deque)]);
254 }
255
256 static struct ccase *
257 casewindow_memory_get_case (void *cwm_, casenumber ofs)
258 {
259   struct casewindow_memory *cwm = cwm_;
260   return case_ref (cwm->cases[deque_front (&cwm->deque, ofs)]);
261 }
262
263 static casenumber
264 casewindow_memory_get_case_cnt (const void *cwm_)
265 {
266   const struct casewindow_memory *cwm = cwm_;
267   return deque_count (&cwm->deque);
268 }
269
270 static const struct casewindow_class casewindow_memory_class =
271   {
272     casewindow_memory_create,
273     casewindow_memory_destroy,
274     casewindow_memory_push_head,
275     casewindow_memory_pop_tail,
276     casewindow_memory_get_case,
277     casewindow_memory_get_case_cnt,
278   };
279 \f
280 /* On-disk casewindow data. */
281 struct casewindow_file
282   {
283     struct case_tmpfile *file;
284     casenumber head, tail;
285   };
286
287 static void *
288 casewindow_file_create (struct taint *taint, size_t value_cnt)
289 {
290   struct casewindow_file *cwf = xmalloc (sizeof *cwf);
291   cwf->file = case_tmpfile_create (value_cnt);
292   cwf->head = cwf->tail = 0;
293   taint_propagate (case_tmpfile_get_taint (cwf->file), taint);
294   return cwf;
295 }
296
297 static void
298 casewindow_file_destroy (void *cwf_)
299 {
300   struct casewindow_file *cwf = cwf_;
301   case_tmpfile_destroy (cwf->file);
302   free (cwf);
303 }
304
305 static void
306 casewindow_file_push_head (void *cwf_, struct ccase *c)
307 {
308   struct casewindow_file *cwf = cwf_;
309   if (case_tmpfile_put_case (cwf->file, cwf->head, c))
310     cwf->head++;
311 }
312
313 static void
314 casewindow_file_pop_tail (void *cwf_, casenumber cnt)
315 {
316   struct casewindow_file *cwf = cwf_;
317   assert (cnt <= cwf->head - cwf->tail);
318   cwf->tail += cnt;
319   if (cwf->head == cwf->tail)
320     cwf->head = cwf->tail = 0;
321 }
322
323 static struct ccase *
324 casewindow_file_get_case (void *cwf_, casenumber ofs)
325 {
326   struct casewindow_file *cwf = cwf_;
327   return case_tmpfile_get_case (cwf->file, cwf->tail + ofs);
328 }
329
330 static casenumber
331 casewindow_file_get_case_cnt (const void *cwf_)
332 {
333   const struct casewindow_file *cwf = cwf_;
334   return cwf->head - cwf->tail;
335 }
336
337 static const struct casewindow_class casewindow_file_class =
338   {
339     casewindow_file_create,
340     casewindow_file_destroy,
341     casewindow_file_push_head,
342     casewindow_file_pop_tail,
343     casewindow_file_get_case,
344     casewindow_file_get_case_cnt,
345   };