03c6dbbb0ea36a421a956498f2fb4470c14771a3
[pspp] / src / language / data-io / data-parser.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013, 2016 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/data-io/data-parser.h"
20
21 #include <stdint.h>
22 #include <stdlib.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/data-in.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/file-handle-def.h"
30 #include "data/settings.h"
31 #include "language/data-io/data-reader.h"
32 #include "libpspp/message.h"
33 #include "libpspp/str.h"
34 #include "output/pivot-table.h"
35
36 #include "gl/xalloc.h"
37
38 #include "gettext.h"
39 #define N_(msgid) msgid
40 #define _(msgid) gettext (msgid)
41
42 /* Data parser for textual data like that read by DATA LIST. */
43 struct data_parser
44   {
45     struct dictionary *dict;    /* Dictionary of destination */
46     enum data_parser_type type; /* Type of data to parse. */
47     int skip_records;           /* Records to skip before first real data. */
48
49     struct field *fields;       /* Fields to parse. */
50     size_t field_cnt;           /* Number of fields. */
51     size_t field_allocated;     /* Number of fields spaced allocated for. */
52
53     /* DP_DELIMITED parsers only. */
54     bool span;                  /* May cases span multiple records? */
55     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
56     bool warn_missing_fields;   /* Should missing fields be considered errors? */
57     struct substring quotes;    /* Characters that can quote separators. */
58     bool quote_escape;          /* Doubled quote acts as escape? */
59     struct substring soft_seps; /* Two soft separators act like just one. */
60     struct substring hard_seps; /* Two hard separators yield empty fields. */
61     struct string any_sep;      /* Concatenation of soft_seps and hard_seps. */
62
63     /* DP_FIXED parsers only. */
64     int records_per_case;       /* Number of records in each case. */
65   };
66
67 /* How to parse one variable. */
68 struct field
69   {
70     struct fmt_spec format;     /* Input format of this field. */
71     int case_idx;               /* First value in case. */
72     char *name;                 /* Var name for error messages and tables. */
73
74     /* DP_FIXED only. */
75     int record;                 /* Record number (1-based). */
76     int first_column;           /* First column in record (1-based). */
77   };
78
79 static void set_any_sep (struct data_parser *parser);
80
81 /* Creates and returns a new data parser. */
82 struct data_parser *
83 data_parser_create (struct dictionary *dict)
84 {
85   struct data_parser *parser = xmalloc (sizeof *parser);
86
87   parser->type = DP_FIXED;
88   parser->skip_records = 0;
89
90   parser->fields = NULL;
91   parser->field_cnt = 0;
92   parser->field_allocated = 0;
93   parser->dict = dict_ref (dict);
94
95   parser->span = true;
96   parser->empty_line_has_field = false;
97   parser->warn_missing_fields = true;
98   ss_alloc_substring (&parser->quotes, ss_cstr ("\"'"));
99   parser->quote_escape = false;
100   ss_alloc_substring (&parser->soft_seps, ss_cstr (CC_SPACES));
101   ss_alloc_substring (&parser->hard_seps, ss_cstr (","));
102   ds_init_empty (&parser->any_sep);
103   set_any_sep (parser);
104
105   parser->records_per_case = 0;
106
107   return parser;
108 }
109
110 /* Destroys PARSER. */
111 void
112 data_parser_destroy (struct data_parser *parser)
113 {
114   if (parser != NULL)
115     {
116       size_t i;
117
118       dict_unref (parser->dict);
119       for (i = 0; i < parser->field_cnt; i++)
120         free (parser->fields[i].name);
121       free (parser->fields);
122       ss_dealloc (&parser->quotes);
123       ss_dealloc (&parser->soft_seps);
124       ss_dealloc (&parser->hard_seps);
125       ds_destroy (&parser->any_sep);
126       free (parser);
127     }
128 }
129
130 /* Returns the type of PARSER (either DP_DELIMITED or DP_FIXED). */
131 enum data_parser_type
132 data_parser_get_type (const struct data_parser *parser)
133 {
134   return parser->type;
135 }
136
137 /* Sets the type of PARSER to TYPE (either DP_DELIMITED or
138    DP_FIXED). */
139 void
140 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
141 {
142   assert (parser->field_cnt == 0);
143   assert (type == DP_FIXED || type == DP_DELIMITED);
144   parser->type = type;
145 }
146
147 /* Configures PARSER to skip the specified number of
148    INITIAL_RECORDS_TO_SKIP before parsing any data.  By default,
149    no records are skipped. */
150 void
151 data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
152 {
153   assert (initial_records_to_skip >= 0);
154   parser->skip_records = initial_records_to_skip;
155 }
156
157 /* Returns true if PARSER is configured to allow cases to span
158    multiple records. */
159 bool
160 data_parser_get_span (const struct data_parser *parser)
161 {
162   return parser->span;
163 }
164
165 /* If MAY_CASES_SPAN_RECORDS is true, configures PARSER to allow
166    a single case to span multiple records and multiple cases to
167    occupy a single record.  If MAY_CASES_SPAN_RECORDS is false,
168    configures PARSER to require each record to contain exactly
169    one case.
170
171    This setting affects parsing of DP_DELIMITED files only. */
172 void
173 data_parser_set_span (struct data_parser *parser, bool may_cases_span_records)
174 {
175   parser->span = may_cases_span_records;
176 }
177
178 /* If EMPTY_LINE_HAS_FIELD is true, configures PARSER to parse an
179    empty line as an empty field and to treat a hard delimiter
180    followed by end-of-line as an empty field.  If
181    EMPTY_LINE_HAS_FIELD is false, PARSER will skip empty lines
182    and hard delimiters at the end of lines without emitting empty
183    fields.
184
185    This setting affects parsing of DP_DELIMITED files only. */
186 void
187 data_parser_set_empty_line_has_field (struct data_parser *parser,
188                                       bool empty_line_has_field)
189 {
190   parser->empty_line_has_field = empty_line_has_field;
191 }
192
193
194 /* If WARN_MISSING_FIELDS is true, configures PARSER to emit a warning
195    and cause an error condition when a missing field is encountered.
196    If  WARN_MISSING_FIELDS is false, PARSER will silently fill such
197    fields with the system missing value.
198
199    This setting affects parsing of DP_DELIMITED files only. */
200 void
201 data_parser_set_warn_missing_fields (struct data_parser *parser,
202                                      bool warn_missing_fields)
203 {
204   parser->warn_missing_fields = warn_missing_fields;
205 }
206
207
208 /* Sets the characters that may be used for quoting field
209    contents to QUOTES.  If QUOTES is empty, quoting will be
210    disabled.
211
212    The caller retains ownership of QUOTES.
213
214    This setting affects parsing of DP_DELIMITED files only. */
215 void
216 data_parser_set_quotes (struct data_parser *parser, struct substring quotes)
217 {
218   ss_dealloc (&parser->quotes);
219   ss_alloc_substring (&parser->quotes, quotes);
220 }
221
222 /* If ESCAPE is false (the default setting), a character used for
223    quoting cannot itself be embedded within a quoted field.  If
224    ESCAPE is true, then a quote character can be embedded within
225    a quoted field by doubling it.
226
227    This setting affects parsing of DP_DELIMITED files only, and
228    only when at least one quote character has been set (with
229    data_parser_set_quotes). */
230 void
231 data_parser_set_quote_escape (struct data_parser *parser, bool escape)
232 {
233   parser->quote_escape = escape;
234 }
235
236 /* Sets PARSER's soft delimiters to DELIMITERS.  Soft delimiters
237    separate fields, but consecutive soft delimiters do not yield
238    empty fields.  (Ordinarily, only white space characters are
239    appropriate soft delimiters.)
240
241    The caller retains ownership of DELIMITERS.
242
243    This setting affects parsing of DP_DELIMITED files only. */
244 void
245 data_parser_set_soft_delimiters (struct data_parser *parser,
246                                  struct substring delimiters)
247 {
248   ss_dealloc (&parser->soft_seps);
249   ss_alloc_substring (&parser->soft_seps, delimiters);
250   set_any_sep (parser);
251 }
252
253 /* Sets PARSER's hard delimiters to DELIMITERS.  Hard delimiters
254    separate fields.  A consecutive pair of hard delimiters yield
255    an empty field.
256
257    The caller retains ownership of DELIMITERS.
258
259    This setting affects parsing of DP_DELIMITED files only. */
260 void
261 data_parser_set_hard_delimiters (struct data_parser *parser,
262                                  struct substring delimiters)
263 {
264   ss_dealloc (&parser->hard_seps);
265   ss_alloc_substring (&parser->hard_seps, delimiters);
266   set_any_sep (parser);
267 }
268
269 /* Returns the number of records per case. */
270 int
271 data_parser_get_records (const struct data_parser *parser)
272 {
273   return parser->records_per_case;
274 }
275
276 /* Sets the number of records per case to RECORDS_PER_CASE.
277
278    This setting affects parsing of DP_FIXED files only. */
279 void
280 data_parser_set_records (struct data_parser *parser, int records_per_case)
281 {
282   assert (records_per_case >= 0);
283   assert (records_per_case >= parser->records_per_case);
284   parser->records_per_case = records_per_case;
285 }
286
287 static void
288 add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
289            const char *name, int record, int first_column)
290 {
291   struct field *field;
292
293   if (p->field_cnt == p->field_allocated)
294     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
295   field = &p->fields[p->field_cnt++];
296   field->format = *format;
297   field->case_idx = case_idx;
298   field->name = xstrdup (name);
299   field->record = record;
300   field->first_column = first_column;
301 }
302
303 /* Adds a delimited field to the field parsed by PARSER, which
304    must be configured as a DP_DELIMITED parser.  The field is
305    parsed as input format FORMAT.  Its data will be stored into case
306    index CASE_INDEX.  Errors in input data will be reported
307    against variable NAME. */
308 void
309 data_parser_add_delimited_field (struct data_parser *parser,
310                                  const struct fmt_spec *format, int case_idx,
311                                  const char *name)
312 {
313   assert (parser->type == DP_DELIMITED);
314   add_field (parser, format, case_idx, name, 0, 0);
315 }
316
317 /* Adds a fixed field to the field parsed by PARSER, which
318    must be configured as a DP_FIXED parser.  The field is
319    parsed as input format FORMAT.  Its data will be stored into case
320    index CASE_INDEX.  Errors in input data will be reported
321    against variable NAME.  The field will be drawn from the
322    FORMAT->w columns in 1-based RECORD starting at 1-based
323    column FIRST_COLUMN.
324
325    RECORD must be at least as great as that of any field already
326    added; that is, fields must be added in increasing order of
327    record number.  If RECORD is greater than the current number
328    of records per case, the number of records per case are
329    increased as needed.  */
330 void
331 data_parser_add_fixed_field (struct data_parser *parser,
332                              const struct fmt_spec *format, int case_idx,
333                              const char *name,
334                              int record, int first_column)
335 {
336   assert (parser->type == DP_FIXED);
337   assert (parser->field_cnt == 0
338           || record >= parser->fields[parser->field_cnt - 1].record);
339   if (record > parser->records_per_case)
340     parser->records_per_case = record;
341   add_field (parser, format, case_idx, name, record, first_column);
342 }
343
344 /* Returns true if any fields have been added to PARSER, false
345    otherwise. */
346 bool
347 data_parser_any_fields (const struct data_parser *parser)
348 {
349   return parser->field_cnt > 0;
350 }
351
352 static void
353 set_any_sep (struct data_parser *parser)
354 {
355   ds_assign_substring (&parser->any_sep, parser->soft_seps);
356   ds_put_substring (&parser->any_sep, parser->hard_seps);
357 }
358 \f
359 static bool parse_delimited_span (const struct data_parser *,
360                                   struct dfm_reader *, struct ccase *);
361 static bool parse_delimited_no_span (const struct data_parser *,
362                                      struct dfm_reader *, struct ccase *);
363 static bool parse_fixed (const struct data_parser *,
364                          struct dfm_reader *, struct ccase *);
365
366 /* Reads a case from DFM into C, parsing it with PARSER.  Returns
367    true if successful, false at end of file or on I/O error.
368
369    Case C must not be shared. */
370 bool
371 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
372                    struct ccase *c)
373 {
374   bool retval;
375
376   assert (!case_is_shared (c));
377   assert (data_parser_any_fields (parser));
378
379   /* Skip the requested number of records before reading the
380      first case. */
381   for (; parser->skip_records > 0; parser->skip_records--)
382     {
383       if (dfm_eof (reader))
384         return false;
385       dfm_forward_record (reader);
386     }
387
388   /* Limit cases. */
389   if (parser->type == DP_DELIMITED)
390     {
391       if (parser->span)
392         retval = parse_delimited_span (parser, reader, c);
393       else
394         retval = parse_delimited_no_span (parser, reader, c);
395     }
396   else
397     retval = parse_fixed (parser, reader, c);
398
399   return retval;
400 }
401
402 /* Extracts a delimited field from the current position in the
403    current record according to PARSER, reading data from READER.
404
405    *FIELD is set to the field content.  The caller must not or
406    destroy this constant string.
407
408    Sets *FIRST_COLUMN to the 1-based column number of the start of
409    the extracted field, and *LAST_COLUMN to the end of the extracted
410    field.
411
412    Returns true on success, false on failure. */
413 static bool
414 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
415            int *first_column, int *last_column, struct string *tmp,
416            struct substring *field)
417 {
418   size_t length_before_separators;
419   struct substring line, p;
420   bool quoted;
421
422   if (dfm_eof (reader))
423     return false;
424   if (ss_is_empty (parser->hard_seps))
425     dfm_expand_tabs (reader);
426   line = p = dfm_get_record (reader);
427
428   /* Skip leading soft separators. */
429   ss_ltrim (&p, parser->soft_seps);
430
431   /* Handle empty or completely consumed lines. */
432   if (ss_is_empty (p))
433     {
434       if (!parser->empty_line_has_field || dfm_columns_past_end (reader) > 0)
435         return false;
436       else
437         {
438           *field = p;
439           *first_column = dfm_column_start (reader);
440           *last_column = *first_column + 1;
441           dfm_forward_columns (reader, 1);
442           return true;
443         }
444     }
445
446   *first_column = dfm_column_start (reader);
447   quoted = ss_find_byte (parser->quotes, ss_first (p)) != SIZE_MAX;
448   if (quoted)
449     {
450       /* Quoted field. */
451       int quote = ss_get_byte (&p);
452       if (!ss_get_until (&p, quote, field))
453         msg (DW, _("Quoted string extends beyond end of line."));
454       if (parser->quote_escape && ss_first (p) == quote)
455         {
456           ds_assign_substring (tmp, *field);
457           while (ss_match_byte (&p, quote))
458             {
459               struct substring ss;
460               ds_put_byte (tmp, quote);
461               if (!ss_get_until (&p, quote, &ss))
462                 msg (DW, _("Quoted string extends beyond end of line."));
463               ds_put_substring (tmp, ss);
464             }
465           *field = ds_ss (tmp);
466         }
467       *last_column = *first_column + (ss_length (line) - ss_length (p));
468     }
469   else
470     {
471       /* Regular field. */
472       ss_get_bytes (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
473       *last_column = *first_column + ss_length (*field);
474     }
475
476   /* Skip trailing soft separator and a single hard separator if present. */
477   length_before_separators = ss_length (p);
478   ss_ltrim (&p, parser->soft_seps);
479   if (!ss_is_empty (p)
480       && ss_find_byte (parser->hard_seps, ss_first (p)) != SIZE_MAX)
481     {
482       ss_advance (&p, 1);
483       ss_ltrim (&p, parser->soft_seps);
484     }
485   if (ss_is_empty (p))
486     dfm_forward_columns (reader, 1);
487   else if (quoted && length_before_separators == ss_length (p))
488     msg (DW, _("Missing delimiter following quoted string."));
489   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
490
491   return true;
492 }
493
494 static void
495 parse_error (const struct dfm_reader *reader, const struct field *field,
496              int first_column, int last_column, char *error)
497 {
498   int line_number = dfm_get_line_number (reader);
499   struct msg_location *location = xmalloc (sizeof *location);
500   *location = (struct msg_location) {
501     .file_name = xstrdup (dfm_get_file_name (reader)),
502     .first_line = line_number,
503     .last_line = line_number + 1,
504     .first_column = first_column,
505     .last_column = last_column,
506   };
507   struct msg *m = xmalloc (sizeof *m);
508   *m = (struct msg) {
509     .category = MSG_C_DATA,
510     .severity = MSG_S_WARNING,
511     .location = location,
512     .text = xasprintf (_("Data for variable %s is not valid as format %s: %s"),
513                        field->name, fmt_name (field->format.type), error),
514   };
515   msg_emit (m);
516
517   free (error);
518 }
519
520 /* Reads a case from READER into C, parsing it according to
521    fixed-format syntax rules in PARSER.
522    Returns true if successful, false at end of file or on I/O error. */
523 static bool
524 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
525              struct ccase *c)
526 {
527   const char *input_encoding = dfm_reader_get_encoding (reader);
528   const char *output_encoding = dict_get_encoding (parser->dict);
529   struct field *f;
530   int row;
531
532   if (dfm_eof (reader))
533     return false;
534
535   f = parser->fields;
536   for (row = 1; row <= parser->records_per_case; row++)
537     {
538       struct substring line;
539
540       if (dfm_eof (reader))
541         {
542           msg (DW, _("Partial case of %d of %d records discarded."),
543                row - 1, parser->records_per_case);
544           return false;
545         }
546       dfm_expand_tabs (reader);
547       line = dfm_get_record (reader);
548
549       for (; f < &parser->fields[parser->field_cnt] && f->record == row; f++)
550         {
551           struct substring s = ss_substr (line, f->first_column - 1,
552                                           f->format.w);
553           union value *value = case_data_rw_idx (c, f->case_idx);
554           char *error = data_in (s, input_encoding, f->format.type,
555                                  settings_get_fmt_settings (),
556                                  value, fmt_var_width (&f->format),
557                                  output_encoding);
558
559           if (error == NULL)
560             data_in_imply_decimals (s, input_encoding, f->format.type,
561                                     f->format.d, settings_get_fmt_settings (),
562                                     value);
563           else
564             parse_error (reader, f, f->first_column,
565                          f->first_column + f->format.w, error);
566         }
567
568       dfm_forward_record (reader);
569     }
570
571   return true;
572 }
573
574 /* Reads a case from READER into C, parsing it according to
575    free-format syntax rules in PARSER.
576    Returns true if successful, false at end of file or on I/O error. */
577 static bool
578 parse_delimited_span (const struct data_parser *parser,
579                       struct dfm_reader *reader, struct ccase *c)
580 {
581   const char *output_encoding = dict_get_encoding (parser->dict);
582   struct string tmp = DS_EMPTY_INITIALIZER;
583   struct field *f;
584
585   for (f = parser->fields; f < &parser->fields[parser->field_cnt]; f++)
586     {
587       struct substring s;
588       int first_column, last_column;
589       char *error;
590
591       /* Cut out a field and read in a new record if necessary. */
592       while (!cut_field (parser, reader,
593                          &first_column, &last_column, &tmp, &s))
594         {
595           if (!dfm_eof (reader))
596             dfm_forward_record (reader);
597           if (dfm_eof (reader))
598             {
599               if (f > parser->fields)
600                 msg (DW, _("Partial case discarded.  The first variable "
601                            "missing was %s."), f->name);
602               ds_destroy (&tmp);
603               return false;
604             }
605         }
606
607       const char *input_encoding = dfm_reader_get_encoding (reader);
608       error = data_in (s, input_encoding, f->format.type,
609                        settings_get_fmt_settings (),
610                        case_data_rw_idx (c, f->case_idx),
611                        fmt_var_width (&f->format), output_encoding);
612       if (error != NULL)
613         parse_error (reader, f, first_column, last_column, error);
614     }
615   ds_destroy (&tmp);
616   return true;
617 }
618
619 /* Reads a case from READER into C, parsing it according to
620    delimited syntax rules with one case per record in PARSER.
621    Returns true if successful, false at end of file or on I/O error. */
622 static bool
623 parse_delimited_no_span (const struct data_parser *parser,
624                          struct dfm_reader *reader, struct ccase *c)
625 {
626   const char *output_encoding = dict_get_encoding (parser->dict);
627   struct string tmp = DS_EMPTY_INITIALIZER;
628   struct substring s;
629   struct field *f, *end;
630
631   if (dfm_eof (reader))
632     return false;
633
634   end = &parser->fields[parser->field_cnt];
635   for (f = parser->fields; f < end; f++)
636     {
637       int first_column, last_column;
638       char *error;
639
640       if (!cut_field (parser, reader, &first_column, &last_column, &tmp, &s))
641         {
642           if (f < end - 1 && settings_get_undefined () && parser->warn_missing_fields)
643             msg (DW, _("Missing value(s) for all variables from %s onward.  "
644                        "These will be filled with the system-missing value "
645                        "or blanks, as appropriate."),
646                  f->name);
647           for (; f < end; f++)
648             value_set_missing (case_data_rw_idx (c, f->case_idx),
649                                fmt_var_width (&f->format));
650           goto exit;
651         }
652
653       const char *input_encoding = dfm_reader_get_encoding (reader);
654       error = data_in (s, input_encoding, f->format.type,
655                        settings_get_fmt_settings (),
656                        case_data_rw_idx (c, f->case_idx),
657                        fmt_var_width (&f->format), output_encoding);
658       if (error != NULL)
659         parse_error (reader, f, first_column, last_column, error);
660     }
661
662   s = dfm_get_record (reader);
663   ss_ltrim (&s, parser->soft_seps);
664   if (!ss_is_empty (s))
665     msg (DW, _("Record ends in data not part of any field."));
666
667 exit:
668   dfm_forward_record (reader);
669   ds_destroy (&tmp);
670   return true;
671 }
672 \f
673 /* Displays a table giving information on fixed-format variable
674    parsing on DATA LIST. */
675 static void
676 dump_fixed_table (const struct data_parser *parser,
677                   const struct file_handle *fh)
678 {
679   /* XXX This should not be preformatted. */
680   char *title = xasprintf (ngettext ("Reading %d record from %s.",
681                                      "Reading %d records from %s.",
682                                      parser->records_per_case),
683                            parser->records_per_case, fh_get_name (fh));
684   struct pivot_table *table = pivot_table_create__ (
685     pivot_value_new_user_text (title, -1), "Fixed Data Records");
686   free (title);
687
688   pivot_dimension_create (
689     table, PIVOT_AXIS_COLUMN, N_("Attributes"),
690     N_("Record"), N_("Columns"), N_("Format"));
691
692   struct pivot_dimension *variables = pivot_dimension_create (
693     table, PIVOT_AXIS_ROW, N_("Variable"));
694   variables->root->show_label = true;
695   for (size_t i = 0; i < parser->field_cnt; i++)
696     {
697       struct field *f = &parser->fields[i];
698
699       /* XXX It would be better to have the actual variable here. */
700       int variable_idx = pivot_category_create_leaf (
701         variables->root, pivot_value_new_user_text (f->name, -1));
702
703       pivot_table_put2 (table, 0, variable_idx,
704                         pivot_value_new_integer (f->record));
705
706       int first_column = f->first_column;
707       int last_column = f->first_column + f->format.w - 1;
708       char *columns = xasprintf ("%d-%d", first_column, last_column);
709       pivot_table_put2 (table, 1, variable_idx,
710                         pivot_value_new_user_text (columns, -1));
711       free (columns);
712
713       char str[FMT_STRING_LEN_MAX + 1];
714       pivot_table_put2 (table, 2, variable_idx,
715                         pivot_value_new_user_text (
716                           fmt_to_string (&f->format, str), -1));
717
718     }
719
720   pivot_table_submit (table);
721 }
722
723 /* Displays a table giving information on free-format variable parsing
724    on DATA LIST. */
725 static void
726 dump_delimited_table (const struct data_parser *parser,
727                       const struct file_handle *fh)
728 {
729   struct pivot_table *table = pivot_table_create__ (
730     pivot_value_new_text_format (N_("Reading free-form data from %s."),
731                                  fh_get_name (fh)),
732     "Free-Form Data Records");
733
734   pivot_dimension_create (
735     table, PIVOT_AXIS_COLUMN, N_("Attributes"), N_("Format"));
736
737   struct pivot_dimension *variables = pivot_dimension_create (
738     table, PIVOT_AXIS_ROW, N_("Variable"));
739   variables->root->show_label = true;
740   for (size_t i = 0; i < parser->field_cnt; i++)
741     {
742       struct field *f = &parser->fields[i];
743
744       /* XXX It would be better to have the actual variable here. */
745       int variable_idx = pivot_category_create_leaf (
746         variables->root, pivot_value_new_user_text (f->name, -1));
747
748       char str[FMT_STRING_LEN_MAX + 1];
749       pivot_table_put2 (table, 0, variable_idx,
750                         pivot_value_new_user_text (
751                           fmt_to_string (&f->format, str), -1));
752     }
753
754   pivot_table_submit (table);
755 }
756
757 /* Displays a table giving information on how PARSER will read
758    data from FH. */
759 void
760 data_parser_output_description (struct data_parser *parser,
761                                 const struct file_handle *fh)
762 {
763   if (parser->type == DP_FIXED)
764     dump_fixed_table (parser, fh);
765   else
766     dump_delimited_table (parser, fh);
767 }
768 \f
769 /* Data parser input program. */
770 struct data_parser_casereader
771   {
772     struct data_parser *parser; /* Parser. */
773     struct dfm_reader *reader;  /* Data file reader. */
774     struct caseproto *proto;    /* Format of cases. */
775   };
776
777 static const struct casereader_class data_parser_casereader_class;
778
779 /* Replaces DS's active dataset by an input program that reads data
780    from READER according to the rules in PARSER, using DICT as
781    the underlying dictionary.  Ownership of PARSER and READER is
782    transferred to the input program, and ownership of DICT is
783    transferred to the dataset. */
784 void
785 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
786                                struct dfm_reader *reader,
787                                struct dictionary *dict,
788                                struct casereader* (*func)(struct casereader *,
789                                                           const struct dictionary *,
790                                                           void *),
791                                void *ud)
792 {
793   struct data_parser_casereader *r;
794   struct casereader *casereader0;
795   struct casereader *casereader1;
796
797   r = xmalloc (sizeof *r);
798   r->parser = parser;
799   r->reader = reader;
800   r->proto = caseproto_ref (dict_get_proto (dict));
801   casereader0 = casereader_create_sequential (NULL, r->proto,
802                                              CASENUMBER_MAX,
803                                              &data_parser_casereader_class, r);
804
805   if (func)
806     casereader1 = func (casereader0, dict, ud);
807   else
808     casereader1 = casereader0;
809
810   dataset_set_dict (ds, dict);
811   dataset_set_source (ds, casereader1);
812 }
813
814
815 static struct ccase *
816 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
817 {
818   struct data_parser_casereader *r = r_;
819   struct ccase *c = case_create (r->proto);
820   if (data_parser_parse (r->parser, r->reader, c))
821     return c;
822   else
823     {
824       case_unref (c);
825       return NULL;
826     }
827 }
828
829 static void
830 data_parser_casereader_destroy (struct casereader *reader, void *r_)
831 {
832   struct data_parser_casereader *r = r_;
833   if (dfm_reader_error (r->reader))
834     casereader_force_error (reader);
835   dfm_close_reader (r->reader);
836   caseproto_unref (r->proto);
837   data_parser_destroy (r->parser);
838   free (r);
839 }
840
841 static const struct casereader_class data_parser_casereader_class =
842   {
843     data_parser_casereader_read,
844     data_parser_casereader_destroy,
845     NULL,
846     NULL,
847   };