Patch #6302.
[pspp-builds.git] / src / language / data-io / data-parser.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <language/data-io/data-parser.h>
20
21 #include <stdint.h>
22 #include <stdlib.h>
23
24 #include <data/casereader-provider.h>
25 #include <data/data-in.h>
26 #include <data/dictionary.h>
27 #include <data/format.h>
28 #include <data/file-handle-def.h>
29 #include <data/procedure.h>
30 #include <data/settings.h>
31 #include <language/data-io/data-reader.h>
32 #include <libpspp/message.h>
33 #include <libpspp/str.h>
34 #include <output/table.h>
35
36 #include "xalloc.h"
37
38 #include "gettext.h"
39 #define _(msgid) gettext (msgid)
40
41 /* Data parser for textual data like that read by DATA LIST. */
42 struct data_parser
43   {
44     enum data_parser_type type; /* Type of data to parse. */
45     int skip_records;           /* Records to skip before first real data. */
46     casenumber max_cases;       /* Max number of cases to read. */
47     int percent_cases;          /* Approximate percent of cases to read. */
48
49     struct field *fields;       /* Fields to parse. */
50     size_t field_cnt;           /* Number of fields. */
51     size_t field_allocated;     /* Number of fields spaced allocated for. */
52
53     /* DP_DELIMITED parsers only. */
54     bool span;                  /* May cases span multiple records? */
55     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
56     struct substring quotes;    /* Characters that can quote separators. */
57     struct substring soft_seps; /* Two soft separators act like just one. */
58     struct substring hard_seps; /* Two hard separators yield empty fields. */
59     struct string any_sep;      /* Concatenation of soft_seps and hard_seps. */
60
61     /* DP_FIXED parsers only. */
62     int records_per_case;       /* Number of records in each case. */
63   };
64
65 /* How to parse one variable. */
66 struct field
67   {
68     struct fmt_spec format;     /* Input format of this field. */
69     int case_idx;               /* First value in case. */
70     char *name;                 /* Var name for error messages and tables. */
71
72     /* DP_FIXED only. */
73     int record;                 /* Record number (1-based). */
74     int first_column;           /* First column in record (1-based). */
75   };
76
77 static void set_any_sep (struct data_parser *parser);
78
79 /* Creates and returns a new data parser. */
80 struct data_parser *
81 data_parser_create (void)
82 {
83   struct data_parser *parser = xmalloc (sizeof *parser);
84
85   parser->type = DP_FIXED;
86   parser->skip_records = 0;
87   parser->max_cases = -1;
88   parser->percent_cases = 100;
89
90   parser->fields = NULL;
91   parser->field_cnt = 0;
92   parser->field_allocated = 0;
93
94   parser->span = true;
95   parser->empty_line_has_field = false;
96   ss_alloc_substring (&parser->quotes, ss_cstr ("\"'"));
97   ss_alloc_substring (&parser->soft_seps, ss_cstr (CC_SPACES));
98   ss_alloc_substring (&parser->hard_seps, ss_cstr (","));
99   ds_init_empty (&parser->any_sep);
100   set_any_sep (parser);
101
102   parser->records_per_case = 0;
103
104   return parser;
105 }
106
107 /* Destroys PARSER. */
108 void
109 data_parser_destroy (struct data_parser *parser)
110 {
111   if (parser != NULL)
112     {
113       size_t i;
114
115       for (i = 0; i < parser->field_cnt; i++)
116         free (parser->fields[i].name);
117       free (parser->fields);
118       ss_dealloc (&parser->quotes);
119       ss_dealloc (&parser->soft_seps);
120       ss_dealloc (&parser->hard_seps);
121       ds_destroy (&parser->any_sep);
122       free (parser);
123     }
124 }
125
126 /* Returns the type of PARSER (either DP_DELIMITED or DP_FIXED). */
127 enum data_parser_type
128 data_parser_get_type (const struct data_parser *parser)
129 {
130   return parser->type;
131 }
132
133 /* Sets the type of PARSER to TYPE (either DP_DELIMITED or
134    DP_FIXED). */
135 void
136 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
137 {
138   assert (parser->field_cnt == 0);
139   assert (type == DP_FIXED || type == DP_DELIMITED);
140   parser->type = type;
141 }
142
143 /* Configures PARSER to skip the specified number of
144    INITIAL_RECORDS_TO_SKIP before parsing any data.  By default,
145    no records are skipped. */
146 void
147 data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
148 {
149   assert (initial_records_to_skip >= 0);
150   parser->skip_records = initial_records_to_skip;
151 }
152
153 /* Sets the maximum number of cases parsed by PARSER to
154    MAX_CASES.  The default is -1, meaning no limit. */
155 void
156 data_parser_set_case_limit (struct data_parser *parser, casenumber max_cases)
157 {
158   parser->max_cases = max_cases;
159 }
160
161 /* Sets the percentage of cases that PARSER should read from the
162    input file to PERCENT_CASES.  By default, all cases are
163    read. */
164 void
165 data_parser_set_case_percent (struct data_parser *parser, int percent_cases)
166 {
167   assert (percent_cases >= 0 && percent_cases <= 100);
168   parser->percent_cases = percent_cases;
169 }
170
171 /* Returns true if PARSER is configured to allow cases to span
172    multiple records. */
173 bool
174 data_parser_get_span (const struct data_parser *parser)
175 {
176   return parser->span;
177 }
178
179 /* If MAY_CASES_SPAN_RECORDS is true, configures PARSER to allow
180    a single case to span multiple records and multiple cases to
181    occupy a single record.  If MAY_CASES_SPAN_RECORDS is false,
182    configures PARSER to require each record to contain exactly
183    one case.
184
185    This setting affects parsing of DP_DELIMITED files only. */
186 void
187 data_parser_set_span (struct data_parser *parser, bool may_cases_span_records)
188 {
189   parser->span = may_cases_span_records;
190 }
191
192 /* If EMPTY_LINE_HAS_FIELD is true, configures PARSER to parse an
193    empty line as an empty field and to treat a hard delimiter
194    followed by end-of-line as an empty field.  If
195    EMPTY_LINE_HAS_FIELD is false, PARSER will skip empty lines
196    and hard delimiters at the end of lines without emitting empty
197    fields.
198
199    This setting affects parsing of DP_DELIMITED files only. */
200 void
201 data_parser_set_empty_line_has_field (struct data_parser *parser,
202                                       bool empty_line_has_field)
203 {
204   parser->empty_line_has_field = empty_line_has_field;
205 }
206
207 /* Sets the characters that may be used for quoting field
208    contents to QUOTES.  If QUOTES is empty, quoting will be
209    disabled.
210
211    The caller retains ownership of QUOTES.
212
213    This setting affects parsing of DP_DELIMITED files only. */
214 void
215 data_parser_set_quotes (struct data_parser *parser, struct substring quotes)
216 {
217   ss_dealloc (&parser->quotes);
218   ss_alloc_substring (&parser->quotes, quotes);
219 }
220
221 /* Sets PARSER's soft delimiters to DELIMITERS.  Soft delimiters
222    separate fields, but consecutive soft delimiters do not yield
223    empty fields.  (Ordinarily, only white space characters are
224    appropriate soft delimiters.)
225
226    The caller retains ownership of DELIMITERS.
227
228    This setting affects parsing of DP_DELIMITED files only. */
229 void
230 data_parser_set_soft_delimiters (struct data_parser *parser,
231                                  struct substring delimiters)
232 {
233   ss_dealloc (&parser->soft_seps);
234   ss_alloc_substring (&parser->soft_seps, delimiters);
235   set_any_sep (parser);
236 }
237
238 /* Sets PARSER's hard delimiters to DELIMITERS.  Hard delimiters
239    separate fields.  A consecutive pair of hard delimiters yield
240    an empty field.
241
242    The caller retains ownership of DELIMITERS.
243
244    This setting affects parsing of DP_DELIMITED files only. */
245 void
246 data_parser_set_hard_delimiters (struct data_parser *parser,
247                                  struct substring delimiters)
248 {
249   ss_dealloc (&parser->hard_seps);
250   ss_alloc_substring (&parser->hard_seps, delimiters);
251   set_any_sep (parser);
252 }
253
254 /* Returns the number of records per case. */
255 int
256 data_parser_get_records (const struct data_parser *parser)
257 {
258   return parser->records_per_case;
259 }
260
261 /* Sets the number of records per case to RECORDS_PER_CASE.
262
263    This setting affects parsing of DP_FIXED files only. */
264 void
265 data_parser_set_records (struct data_parser *parser, int records_per_case)
266 {
267   assert (records_per_case >= 0);
268   assert (records_per_case >= parser->records_per_case);
269   parser->records_per_case = records_per_case;
270 }
271
272 static void
273 add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
274            const char *name, int record, int first_column)
275 {
276   struct field *field;
277
278   if (p->field_cnt == p->field_allocated)
279     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
280   field = &p->fields[p->field_cnt++];
281   field->format = *format;
282   field->case_idx = case_idx;
283   field->name = xstrdup (name);
284   field->record = record;
285   field->first_column = first_column;
286 }
287
288 /* Adds a delimited field to the field parsed by PARSER, which
289    must be configured as a DP_DELIMITED parser.  The field is
290    parsed as input format FORMAT.  Its data will be stored into case
291    index CASE_INDEX.  Errors in input data will be reported
292    against variable NAME. */
293 void
294 data_parser_add_delimited_field (struct data_parser *parser,
295                                  const struct fmt_spec *format, int case_idx,
296                                  const char *name)
297 {
298   assert (parser->type == DP_DELIMITED);
299   add_field (parser, format, case_idx, name, 0, 0);
300 }
301
302 /* Adds a fixed field to the field parsed by PARSER, which
303    must be configured as a DP_FIXED parser.  The field is
304    parsed as input format FORMAT.  Its data will be stored into case
305    index CASE_INDEX.  Errors in input data will be reported
306    against variable NAME.  The field will be drawn from the
307    FORMAT->w columns in 1-based RECORD starting at 1-based
308    column FIRST_COLUMN.
309
310    RECORD must be at least as great as that of any field already
311    added; that is, fields must be added in increasing order of
312    record number.  If RECORD is greater than the current number
313    of records per case, the number of records per case are
314    increased as needed.  */
315 void
316 data_parser_add_fixed_field (struct data_parser *parser,
317                              const struct fmt_spec *format, int case_idx,
318                              const char *name,
319                              int record, int first_column)
320 {
321   assert (parser->type == DP_FIXED);
322   assert (parser->field_cnt == 0
323           || record >= parser->fields[parser->field_cnt - 1].record);
324   if (record > parser->records_per_case)
325     parser->records_per_case = record;
326   add_field (parser, format, case_idx, name, record, first_column);
327 }
328
329 /* Returns true if any fields have been added to PARSER, false
330    otherwise. */
331 bool
332 data_parser_any_fields (const struct data_parser *parser)
333 {
334   return parser->field_cnt > 0;
335 }
336
337 static void
338 set_any_sep (struct data_parser *parser)
339 {
340   ds_assign_substring (&parser->any_sep, parser->soft_seps);
341   ds_put_substring (&parser->any_sep, parser->hard_seps);
342 }
343 \f
344 static bool parse_delimited_span (const struct data_parser *,
345                                   struct dfm_reader *, struct ccase *);
346 static bool parse_delimited_no_span (const struct data_parser *,
347                                      struct dfm_reader *, struct ccase *);
348 static bool parse_fixed (const struct data_parser *,
349                          struct dfm_reader *, struct ccase *);
350
351 /* Reads a case from DFM into C, parsing it with PARSER.
352    Returns true if successful, false at end of file or on I/O error. */
353 bool
354 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
355                    struct ccase *c)
356 {
357   bool retval;
358
359   assert (data_parser_any_fields (parser));
360
361   /* Skip the requested number of records before reading the
362      first case. */
363   for (; parser->skip_records > 0; parser->skip_records--)
364     {
365       if (dfm_eof (reader))
366         return false;
367       dfm_forward_record (reader);
368     }
369
370   /* Limit cases. */
371   if (parser->max_cases != -1 && parser->max_cases-- == 0)
372     return false;
373   if (parser->percent_cases < 100
374       && dfm_get_percent_read (reader) >= parser->percent_cases)
375     return false;
376
377   dfm_push (reader);
378   if (parser->type == DP_DELIMITED)
379     {
380       if (parser->span)
381         retval = parse_delimited_span (parser, reader, c);
382       else
383         retval = parse_delimited_no_span (parser, reader, c);
384     }
385   else
386     retval = parse_fixed (parser, reader, c);
387   dfm_pop (reader);
388
389   return retval;
390 }
391
392 /* Extracts a delimited field from the current position in the
393    current record according to PARSER, reading data from READER.
394
395    *FIELD is set to the field content.  The caller must not or
396    destroy this constant string.
397
398    After parsing the field, sets the current position in the
399    record to just past the field and any trailing delimiter.
400    Returns 0 on failure or a 1-based column number indicating the
401    beginning of the field on success. */
402 static bool
403 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
404            struct substring *field)
405 {
406   struct substring line, p;
407
408   if (dfm_eof (reader))
409     return false;
410   if (ss_is_empty (parser->hard_seps))
411     dfm_expand_tabs (reader);
412   line = p = dfm_get_record (reader);
413
414   /* Skip leading soft separators. */
415   ss_ltrim (&p, parser->soft_seps);
416
417   /* Handle empty or completely consumed lines. */
418   if (ss_is_empty (p))
419     {
420       if (!parser->empty_line_has_field || dfm_columns_past_end (reader) > 0)
421         return false;
422       else
423         {
424           *field = p;
425           dfm_forward_columns (reader, 1);
426           return true;
427         }
428     }
429
430   if (ss_find_char (parser->quotes, ss_first (p)) != SIZE_MAX)
431     {
432       /* Quoted field. */
433       if (!ss_get_until (&p, ss_get_char (&p), field))
434         msg (SW, _("Quoted string extends beyond end of line."));
435
436       /* Skip trailing soft separator and a single hard separator
437          if present. */
438       ss_ltrim (&p, parser->soft_seps);
439       if (!ss_is_empty (p)
440           && ss_find_char (parser->hard_seps, ss_first (p)) != SIZE_MAX)
441         ss_advance (&p, 1);
442     }
443   else
444     {
445       /* Regular field. */
446       ss_get_chars (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
447       if (!ss_ltrim (&p, parser->soft_seps) || ss_is_empty (p))
448         {
449           /* Advance past a trailing hard separator,
450              regardless of whether one actually existed.  If
451              we "skip" a delimiter that was not actually
452              there, then we will return end-of-line on our
453              next call, which is what we want. */
454           dfm_forward_columns (reader, 1);
455         }
456     }
457   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
458
459   return true;
460 }
461
462 /* Reads a case from READER into C, parsing it according to
463    fixed-format syntax rules in PARSER.
464    Returns true if successful, false at end of file or on I/O error. */
465 static bool
466 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
467              struct ccase *c)
468 {
469   enum legacy_encoding encoding = dfm_reader_get_legacy_encoding (reader);
470   struct field *f;
471   int row;
472
473   if (dfm_eof (reader))
474     return false;
475
476   f = parser->fields;
477   for (row = 1; row <= parser->records_per_case; row++)
478     {
479       struct substring line;
480
481       if (dfm_eof (reader))
482         {
483           msg (SW, _("Partial case of %d of %d records discarded."),
484                row - 1, parser->records_per_case);
485           return false;
486         }
487       dfm_expand_tabs (reader);
488       line = dfm_get_record (reader);
489
490       for (; f < &parser->fields[parser->field_cnt] && f->record == row; f++)
491         data_in (ss_substr (line, f->first_column - 1,
492                             f->format.w),
493                  encoding, f->format.type, f->format.d,
494                  f->first_column, case_data_rw_idx (c, f->case_idx),
495                  fmt_var_width (&f->format));
496
497       dfm_forward_record (reader);
498     }
499
500   return true;
501 }
502
503 /* Reads a case from READER into C, parsing it according to
504    free-format syntax rules in PARSER.
505    Returns true if successful, false at end of file or on I/O error. */
506 static bool
507 parse_delimited_span (const struct data_parser *parser,
508                       struct dfm_reader *reader, struct ccase *c)
509 {
510   enum legacy_encoding encoding = dfm_reader_get_legacy_encoding (reader);
511   struct field *f;
512
513   for (f = parser->fields; f < &parser->fields[parser->field_cnt]; f++)
514     {
515       struct substring s;
516
517       /* Cut out a field and read in a new record if necessary. */
518       while (!cut_field (parser, reader, &s))
519         {
520           if (!dfm_eof (reader))
521             dfm_forward_record (reader);
522           if (dfm_eof (reader))
523             {
524               if (f > parser->fields)
525                 msg (SW, _("Partial case discarded.  The first variable "
526                            "missing was %s."), f->name);
527               return false;
528             }
529         }
530
531       data_in (s, encoding, f->format.type, 0,
532                dfm_get_column (reader, ss_data (s)),
533                case_data_rw_idx (c, f->case_idx),
534                fmt_var_width (&f->format));
535     }
536   return true;
537 }
538
539 /* Reads a case from READER into C, parsing it according to
540    delimited syntax rules with one case per record in PARSER.
541    Returns true if successful, false at end of file or on I/O error. */
542 static bool
543 parse_delimited_no_span (const struct data_parser *parser,
544                          struct dfm_reader *reader, struct ccase *c)
545 {
546   enum legacy_encoding encoding = dfm_reader_get_legacy_encoding (reader);
547   struct substring s;
548   struct field *f;
549
550   if (dfm_eof (reader))
551     return false;
552
553   for (f = parser->fields; f < &parser->fields[parser->field_cnt]; f++)
554     {
555       if (!cut_field (parser, reader, &s))
556         {
557           if (get_undefined ())
558             msg (SW, _("Missing value(s) for all variables from %s onward.  "
559                        "These will be filled with the system-missing value "
560                        "or blanks, as appropriate."),
561                  f->name);
562           for (; f < &parser->fields[parser->field_cnt]; f++)
563             {
564               int width = fmt_var_width (&f->format);
565               if (width == 0)
566                 case_data_rw_idx (c, f->case_idx)->f = SYSMIS;
567               else
568                 memset (case_data_rw_idx (c, f->case_idx)->s, ' ', width);
569             }
570           goto exit;
571         }
572
573       data_in (s, encoding, f->format.type, 0,
574                dfm_get_column (reader, ss_data (s)),
575                case_data_rw_idx (c, f->case_idx),
576                fmt_var_width (&f->format));
577     }
578
579   s = dfm_get_record (reader);
580   ss_ltrim (&s, parser->soft_seps);
581   if (!ss_is_empty (s))
582     msg (SW, _("Record ends in data not part of any field."));
583
584 exit:
585   dfm_forward_record (reader);
586   return true;
587 }
588 \f
589 /* Displays a table giving information on fixed-format variable
590    parsing on DATA LIST. */
591 static void
592 dump_fixed_table (const struct data_parser *parser,
593                   const struct file_handle *fh)
594 {
595   struct tab_table *t;
596   size_t i;
597
598   t = tab_create (4, parser->field_cnt + 1, 0);
599   tab_columns (t, TAB_COL_DOWN, 1);
600   tab_headers (t, 0, 0, 1, 0);
601   tab_text (t, 0, 0, TAB_CENTER | TAT_TITLE, _("Variable"));
602   tab_text (t, 1, 0, TAB_CENTER | TAT_TITLE, _("Record"));
603   tab_text (t, 2, 0, TAB_CENTER | TAT_TITLE, _("Columns"));
604   tab_text (t, 3, 0, TAB_CENTER | TAT_TITLE, _("Format"));
605   tab_box (t, TAL_1, TAL_1, TAL_0, TAL_1, 0, 0, 3, parser->field_cnt);
606   tab_hline (t, TAL_2, 0, 3, 1);
607   tab_dim (t, tab_natural_dimensions);
608
609   for (i = 0; i < parser->field_cnt; i++)
610     {
611       struct field *f = &parser->fields[i];
612       char fmt_string[FMT_STRING_LEN_MAX + 1];
613       int row = i + 1;
614
615       tab_text (t, 0, row, TAB_LEFT, f->name);
616       tab_text (t, 1, row, TAT_PRINTF, "%d", f->record);
617       tab_text (t, 2, row, TAT_PRINTF, "%3d-%3d",
618                 f->first_column, f->first_column + f->format.w - 1);
619       tab_text (t, 3, row, TAB_LEFT | TAB_FIX,
620                 fmt_to_string (&f->format, fmt_string));
621     }
622
623   tab_title (t, ngettext ("Reading %d record from %s.",
624                           "Reading %d records from %s.",
625                           parser->records_per_case),
626              parser->records_per_case, fh_get_name (fh));
627   tab_submit (t);
628 }
629
630 /* Displays a table giving information on free-format variable parsing
631    on DATA LIST. */
632 static void
633 dump_delimited_table (const struct data_parser *parser,
634                       const struct file_handle *fh)
635 {
636   struct tab_table *t;
637   size_t i;
638
639   t = tab_create (2, parser->field_cnt + 1, 0);
640   tab_columns (t, TAB_COL_DOWN, 1);
641   tab_headers (t, 0, 0, 1, 0);
642   tab_text (t, 0, 0, TAB_CENTER | TAT_TITLE, _("Variable"));
643   tab_text (t, 1, 0, TAB_CENTER | TAT_TITLE, _("Format"));
644   tab_box (t, TAL_1, TAL_1, TAL_0, TAL_1, 0, 0, 1, parser->field_cnt);
645   tab_hline (t, TAL_2, 0, 1, 1);
646   tab_dim (t, tab_natural_dimensions);
647
648   for (i = 0; i < parser->field_cnt; i++)
649     {
650       struct field *f = &parser->fields[i];
651       char str[FMT_STRING_LEN_MAX + 1];
652       int row = i + 1;
653
654       tab_text (t, 0, row, TAB_LEFT, f->name);
655       tab_text (t, 1, row, TAB_LEFT | TAB_FIX,
656                 fmt_to_string (&f->format, str));
657     }
658
659   tab_title (t, _("Reading free-form data from %s."), fh_get_name (fh));
660
661   tab_submit (t);
662 }
663
664 /* Displays a table giving information on how PARSER will read
665    data from FH. */
666 void
667 data_parser_output_description (struct data_parser *parser,
668                                 const struct file_handle *fh)
669 {
670   if (parser->type == DP_FIXED)
671     dump_fixed_table (parser, fh);
672   else
673     dump_delimited_table (parser, fh);
674 }
675 \f
676 /* Data parser input program. */
677 struct data_parser_casereader
678   {
679     struct data_parser *parser; /* Parser. */
680     struct dfm_reader *reader;  /* Data file reader. */
681     size_t value_cnt;           /* Number of `union value's in case. */
682   };
683
684 static const struct casereader_class data_parser_casereader_class;
685
686 /* Replaces DS's active file by an input program that reads data
687    from READER according to the rules in PARSER, using DICT as
688    the underlying dictionary.  Ownership of PARSER and READER is
689    transferred to the input program, and ownership of DICT is
690    transferred to the dataset. */
691 void
692 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
693                               struct dfm_reader *reader,
694                               struct dictionary *dict)
695 {
696   struct data_parser_casereader *r;
697   struct casereader *casereader;
698
699   r = xmalloc (sizeof *r);
700   r->parser = parser;
701   r->reader = reader;
702   r->value_cnt = dict_get_next_value_idx (dict);
703   casereader = casereader_create_sequential (NULL, r->value_cnt,
704                                              CASENUMBER_MAX,
705                                              &data_parser_casereader_class, r);
706   proc_set_active_file (ds, casereader, dict);
707 }
708
709 static bool
710 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_,
711                            struct ccase *c)
712 {
713   struct data_parser_casereader *r = r_;
714   bool ok;
715
716   case_create (c, r->value_cnt);
717   ok = data_parser_parse (r->parser, r->reader, c);
718   if (!ok)
719     case_destroy (c);
720   return ok;
721 }
722
723 static void
724 data_parser_casereader_destroy (struct casereader *reader UNUSED, void *r_)
725 {
726   struct data_parser_casereader *r = r_;
727   if (dfm_reader_error (r->reader))
728     casereader_force_error (reader);
729   data_parser_destroy (r->parser);
730   dfm_close_reader (r->reader);
731   free (r);
732 }
733
734 static const struct casereader_class data_parser_casereader_class =
735   {
736     data_parser_casereader_read,
737     data_parser_casereader_destroy,
738     NULL,
739     NULL,
740   };