eb9ee4c8fc9ab52533756b09bfd1296f5305cc36
[pspp] / src / data / casereader-shim.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/casereader-shim.h"
20
21 #include <stdlib.h>
22
23 #include "data/casereader.h"
24 #include "data/casereader-provider.h"
25 #include "data/casewindow.h"
26 #include "data/settings.h"
27 #include "libpspp/taint.h"
28
29 #include "gl/xalloc.h"
30
31 /* A buffering shim casereader. */
32 struct casereader_shim
33   {
34     struct casewindow *window;          /* Window of buffered cases. */
35     struct casereader *subreader;       /* Subordinate casereader. */
36   };
37
38 static const struct casereader_random_class shim_class;
39
40 static bool buffer_case (struct casereader_shim *s);
41
42 /* Interposes a buffering shim on READER.
43
44    Returns the new shim.  The only legitimate use of the returned
45    casereader_shim is for calling casereader_shim_slurp().  If READER has no
46    clones already (which the caller should ensure, if it plans to use the
47    return value), then the returned casreader_shim is valid for that purpose
48    until, and only until, the READER's 'destroy' function is called. */
49 struct casereader_shim *
50 casereader_shim_insert (struct casereader *reader)
51 {
52   const struct caseproto *proto = casereader_get_proto (reader);
53   casenumber n_cases = casereader_get_n_cases (reader);
54   struct casereader_shim *s = xmalloc (sizeof *s);
55   s->window = casewindow_create (proto, settings_get_workspace_cases (proto));
56   s->subreader = casereader_create_random (proto, n_cases, &shim_class, s);
57   casereader_swap (reader, s->subreader);
58   taint_propagate (casewindow_get_taint (s->window),
59                    casereader_get_taint (reader));
60   taint_propagate (casereader_get_taint (s->subreader),
61                    casereader_get_taint (reader));
62   return s;
63 }
64
65 /* Reads all of the cases from S's subreader into S's buffer and destroys S's
66    subreader.  (This is a no-op if the subreader has already been
67    destroyed.)
68
69    Refer to the comment on casereader_shim_insert() for information on when
70    this function may be used. */
71 void
72 casereader_shim_slurp (struct casereader_shim *s)
73 {
74   while (buffer_case (s))
75     continue;
76 }
77
78 /* Reads a case from S's subreader and appends it to S's window.  Returns true
79    if successful, false at the end of S's subreader or upon an I/O error. */
80 static bool
81 buffer_case (struct casereader_shim *s)
82 {
83   struct ccase *tmp;
84
85   if (s->subreader == NULL)
86     return false;
87
88   tmp = casereader_read (s->subreader);
89   if (tmp == NULL)
90     {
91       casereader_destroy (s->subreader);
92       s->subreader = NULL;
93       return false;
94     }
95
96   casewindow_push_head (s->window, tmp);
97   return true;
98 }
99
100 /* Reads the case at the given 0-based OFFSET from the front of the window into
101    C.  Returns the case if successful, or a null pointer if OFFSET is beyond
102    the end of file or upon I/O error.  The caller must call case_unref() on the
103    returned case when it is no longer needed. */
104 static struct ccase *
105 casereader_shim_read (struct casereader *reader UNUSED, void *s_,
106                       casenumber offset)
107 {
108   struct casereader_shim *s = s_;
109
110   while (casewindow_get_n_cases (s->window) <= offset)
111     if (!buffer_case (s))
112       return false;
113
114   return casewindow_get_case (s->window, offset);
115 }
116
117 /* Destroys S. */
118 static void
119 casereader_shim_destroy (struct casereader *reader UNUSED, void *s_)
120 {
121   struct casereader_shim *s = s_;
122   casewindow_destroy (s->window);
123   casereader_destroy (s->subreader);
124   free (s);
125 }
126
127 /* Discards CNT cases from the front of S's window. */
128 static void
129 casereader_shim_advance (struct casereader *reader UNUSED, void *s_,
130                          casenumber n_cases)
131 {
132   struct casereader_shim *s = s_;
133   casewindow_pop_tail (s->window, n_cases);
134 }
135
136 /* Class for the buffered reader. */
137 static const struct casereader_random_class shim_class =
138   {
139     casereader_shim_read,
140     casereader_shim_destroy,
141     casereader_shim_advance,
142   };