treewide: Replace <name>_cnt by n_<name>s and <name>_cap by allocated_<name>.
[pspp] / src / language / data-io / data-parser.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013, 2016 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/data-io/data-parser.h"
20
21 #include <stdint.h>
22 #include <stdlib.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/data-in.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/file-handle-def.h"
30 #include "data/settings.h"
31 #include "language/data-io/data-reader.h"
32 #include "libpspp/intern.h"
33 #include "libpspp/message.h"
34 #include "libpspp/str.h"
35 #include "output/pivot-table.h"
36
37 #include "gl/xalloc.h"
38
39 #include "gettext.h"
40 #define N_(msgid) msgid
41 #define _(msgid) gettext (msgid)
42
43 /* Data parser for textual data like that read by DATA LIST. */
44 struct data_parser
45   {
46     struct dictionary *dict;    /* Dictionary of destination */
47     enum data_parser_type type; /* Type of data to parse. */
48     int skip_records;           /* Records to skip before first real data. */
49
50     struct field *fields;       /* Fields to parse. */
51     size_t n_fields;            /* Number of fields. */
52     size_t field_allocated;     /* Number of fields spaced allocated for. */
53
54     /* DP_DELIMITED parsers only. */
55     bool span;                  /* May cases span multiple records? */
56     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
57     bool warn_missing_fields;   /* Should missing fields be considered errors? */
58     struct substring quotes;    /* Characters that can quote separators. */
59     bool quote_escape;          /* Doubled quote acts as escape? */
60     struct substring soft_seps; /* Two soft separators act like just one. */
61     struct substring hard_seps; /* Two hard separators yield empty fields. */
62     struct string any_sep;      /* Concatenation of soft_seps and hard_seps. */
63
64     /* DP_FIXED parsers only. */
65     int records_per_case;       /* Number of records in each case. */
66   };
67
68 /* How to parse one variable. */
69 struct field
70   {
71     struct fmt_spec format;     /* Input format of this field. */
72     int case_idx;               /* First value in case. */
73     char *name;                 /* Var name for error messages and tables. */
74
75     /* DP_FIXED only. */
76     int record;                 /* Record number (1-based). */
77     int first_column;           /* First column in record (1-based). */
78   };
79
80 static void set_any_sep (struct data_parser *parser);
81
82 /* Creates and returns a new data parser. */
83 struct data_parser *
84 data_parser_create (struct dictionary *dict)
85 {
86   struct data_parser *parser = xmalloc (sizeof *parser);
87
88   parser->type = DP_FIXED;
89   parser->skip_records = 0;
90
91   parser->fields = NULL;
92   parser->n_fields = 0;
93   parser->field_allocated = 0;
94   parser->dict = dict_ref (dict);
95
96   parser->span = true;
97   parser->empty_line_has_field = false;
98   parser->warn_missing_fields = true;
99   ss_alloc_substring (&parser->quotes, ss_cstr ("\"'"));
100   parser->quote_escape = false;
101   ss_alloc_substring (&parser->soft_seps, ss_cstr (CC_SPACES));
102   ss_alloc_substring (&parser->hard_seps, ss_cstr (","));
103   ds_init_empty (&parser->any_sep);
104   set_any_sep (parser);
105
106   parser->records_per_case = 0;
107
108   return parser;
109 }
110
111 /* Destroys PARSER. */
112 void
113 data_parser_destroy (struct data_parser *parser)
114 {
115   if (parser != NULL)
116     {
117       size_t i;
118
119       dict_unref (parser->dict);
120       for (i = 0; i < parser->n_fields; i++)
121         free (parser->fields[i].name);
122       free (parser->fields);
123       ss_dealloc (&parser->quotes);
124       ss_dealloc (&parser->soft_seps);
125       ss_dealloc (&parser->hard_seps);
126       ds_destroy (&parser->any_sep);
127       free (parser);
128     }
129 }
130
131 /* Returns the type of PARSER (either DP_DELIMITED or DP_FIXED). */
132 enum data_parser_type
133 data_parser_get_type (const struct data_parser *parser)
134 {
135   return parser->type;
136 }
137
138 /* Sets the type of PARSER to TYPE (either DP_DELIMITED or
139    DP_FIXED). */
140 void
141 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
142 {
143   assert (parser->n_fields == 0);
144   assert (type == DP_FIXED || type == DP_DELIMITED);
145   parser->type = type;
146 }
147
148 /* Configures PARSER to skip the specified number of
149    INITIAL_RECORDS_TO_SKIP before parsing any data.  By default,
150    no records are skipped. */
151 void
152 data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
153 {
154   assert (initial_records_to_skip >= 0);
155   parser->skip_records = initial_records_to_skip;
156 }
157
158 /* Returns true if PARSER is configured to allow cases to span
159    multiple records. */
160 bool
161 data_parser_get_span (const struct data_parser *parser)
162 {
163   return parser->span;
164 }
165
166 /* If MAY_CASES_SPAN_RECORDS is true, configures PARSER to allow
167    a single case to span multiple records and multiple cases to
168    occupy a single record.  If MAY_CASES_SPAN_RECORDS is false,
169    configures PARSER to require each record to contain exactly
170    one case.
171
172    This setting affects parsing of DP_DELIMITED files only. */
173 void
174 data_parser_set_span (struct data_parser *parser, bool may_cases_span_records)
175 {
176   parser->span = may_cases_span_records;
177 }
178
179 /* If EMPTY_LINE_HAS_FIELD is true, configures PARSER to parse an
180    empty line as an empty field and to treat a hard delimiter
181    followed by end-of-line as an empty field.  If
182    EMPTY_LINE_HAS_FIELD is false, PARSER will skip empty lines
183    and hard delimiters at the end of lines without emitting empty
184    fields.
185
186    This setting affects parsing of DP_DELIMITED files only. */
187 void
188 data_parser_set_empty_line_has_field (struct data_parser *parser,
189                                       bool empty_line_has_field)
190 {
191   parser->empty_line_has_field = empty_line_has_field;
192 }
193
194
195 /* If WARN_MISSING_FIELDS is true, configures PARSER to emit a warning
196    and cause an error condition when a missing field is encountered.
197    If  WARN_MISSING_FIELDS is false, PARSER will silently fill such
198    fields with the system missing value.
199
200    This setting affects parsing of DP_DELIMITED files only. */
201 void
202 data_parser_set_warn_missing_fields (struct data_parser *parser,
203                                      bool warn_missing_fields)
204 {
205   parser->warn_missing_fields = warn_missing_fields;
206 }
207
208
209 /* Sets the characters that may be used for quoting field
210    contents to QUOTES.  If QUOTES is empty, quoting will be
211    disabled.
212
213    The caller retains ownership of QUOTES.
214
215    This setting affects parsing of DP_DELIMITED files only. */
216 void
217 data_parser_set_quotes (struct data_parser *parser, struct substring quotes)
218 {
219   ss_dealloc (&parser->quotes);
220   ss_alloc_substring (&parser->quotes, quotes);
221 }
222
223 /* If ESCAPE is false (the default setting), a character used for
224    quoting cannot itself be embedded within a quoted field.  If
225    ESCAPE is true, then a quote character can be embedded within
226    a quoted field by doubling it.
227
228    This setting affects parsing of DP_DELIMITED files only, and
229    only when at least one quote character has been set (with
230    data_parser_set_quotes). */
231 void
232 data_parser_set_quote_escape (struct data_parser *parser, bool escape)
233 {
234   parser->quote_escape = escape;
235 }
236
237 /* Sets PARSER's soft delimiters to DELIMITERS.  Soft delimiters
238    separate fields, but consecutive soft delimiters do not yield
239    empty fields.  (Ordinarily, only white space characters are
240    appropriate soft delimiters.)
241
242    The caller retains ownership of DELIMITERS.
243
244    This setting affects parsing of DP_DELIMITED files only. */
245 void
246 data_parser_set_soft_delimiters (struct data_parser *parser,
247                                  struct substring delimiters)
248 {
249   ss_dealloc (&parser->soft_seps);
250   ss_alloc_substring (&parser->soft_seps, delimiters);
251   set_any_sep (parser);
252 }
253
254 /* Sets PARSER's hard delimiters to DELIMITERS.  Hard delimiters
255    separate fields.  A consecutive pair of hard delimiters yield
256    an empty field.
257
258    The caller retains ownership of DELIMITERS.
259
260    This setting affects parsing of DP_DELIMITED files only. */
261 void
262 data_parser_set_hard_delimiters (struct data_parser *parser,
263                                  struct substring delimiters)
264 {
265   ss_dealloc (&parser->hard_seps);
266   ss_alloc_substring (&parser->hard_seps, delimiters);
267   set_any_sep (parser);
268 }
269
270 /* Returns the number of records per case. */
271 int
272 data_parser_get_records (const struct data_parser *parser)
273 {
274   return parser->records_per_case;
275 }
276
277 /* Sets the number of records per case to RECORDS_PER_CASE.
278
279    This setting affects parsing of DP_FIXED files only. */
280 void
281 data_parser_set_records (struct data_parser *parser, int records_per_case)
282 {
283   assert (records_per_case >= 0);
284   assert (records_per_case >= parser->records_per_case);
285   parser->records_per_case = records_per_case;
286 }
287
288 static void
289 add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
290            const char *name, int record, int first_column)
291 {
292   struct field *field;
293
294   if (p->n_fields == p->field_allocated)
295     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
296   field = &p->fields[p->n_fields++];
297   field->format = *format;
298   field->case_idx = case_idx;
299   field->name = xstrdup (name);
300   field->record = record;
301   field->first_column = first_column;
302 }
303
304 /* Adds a delimited field to the field parsed by PARSER, which
305    must be configured as a DP_DELIMITED parser.  The field is
306    parsed as input format FORMAT.  Its data will be stored into case
307    index CASE_INDEX.  Errors in input data will be reported
308    against variable NAME. */
309 void
310 data_parser_add_delimited_field (struct data_parser *parser,
311                                  const struct fmt_spec *format, int case_idx,
312                                  const char *name)
313 {
314   assert (parser->type == DP_DELIMITED);
315   add_field (parser, format, case_idx, name, 0, 0);
316 }
317
318 /* Adds a fixed field to the field parsed by PARSER, which
319    must be configured as a DP_FIXED parser.  The field is
320    parsed as input format FORMAT.  Its data will be stored into case
321    index CASE_INDEX.  Errors in input data will be reported
322    against variable NAME.  The field will be drawn from the
323    FORMAT->w columns in 1-based RECORD starting at 1-based
324    column FIRST_COLUMN.
325
326    RECORD must be at least as great as that of any field already
327    added; that is, fields must be added in increasing order of
328    record number.  If RECORD is greater than the current number
329    of records per case, the number of records per case are
330    increased as needed.  */
331 void
332 data_parser_add_fixed_field (struct data_parser *parser,
333                              const struct fmt_spec *format, int case_idx,
334                              const char *name,
335                              int record, int first_column)
336 {
337   assert (parser->type == DP_FIXED);
338   assert (parser->n_fields == 0
339           || record >= parser->fields[parser->n_fields - 1].record);
340   if (record > parser->records_per_case)
341     parser->records_per_case = record;
342   add_field (parser, format, case_idx, name, record, first_column);
343 }
344
345 /* Returns true if any fields have been added to PARSER, false
346    otherwise. */
347 bool
348 data_parser_any_fields (const struct data_parser *parser)
349 {
350   return parser->n_fields > 0;
351 }
352
353 static void
354 set_any_sep (struct data_parser *parser)
355 {
356   ds_assign_substring (&parser->any_sep, parser->soft_seps);
357   ds_put_substring (&parser->any_sep, parser->hard_seps);
358 }
359 \f
360 static bool parse_delimited_span (const struct data_parser *,
361                                   struct dfm_reader *, struct ccase *);
362 static bool parse_delimited_no_span (const struct data_parser *,
363                                      struct dfm_reader *, struct ccase *);
364 static bool parse_fixed (const struct data_parser *,
365                          struct dfm_reader *, struct ccase *);
366
367 /* Reads a case from DFM into C, parsing it with PARSER.  Returns
368    true if successful, false at end of file or on I/O error.
369
370    Case C must not be shared. */
371 bool
372 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
373                    struct ccase *c)
374 {
375   bool retval;
376
377   assert (!case_is_shared (c));
378   assert (data_parser_any_fields (parser));
379
380   /* Skip the requested number of records before reading the
381      first case. */
382   for (; parser->skip_records > 0; parser->skip_records--)
383     {
384       if (dfm_eof (reader))
385         return false;
386       dfm_forward_record (reader);
387     }
388
389   /* Limit cases. */
390   if (parser->type == DP_DELIMITED)
391     {
392       if (parser->span)
393         retval = parse_delimited_span (parser, reader, c);
394       else
395         retval = parse_delimited_no_span (parser, reader, c);
396     }
397   else
398     retval = parse_fixed (parser, reader, c);
399
400   return retval;
401 }
402
403 /* Extracts a delimited field from the current position in the
404    current record according to PARSER, reading data from READER.
405
406    *FIELD is set to the field content.  The caller must not or
407    destroy this constant string.
408
409    Sets *FIRST_COLUMN to the 1-based column number of the start of
410    the extracted field, and *LAST_COLUMN to the end of the extracted
411    field.
412
413    Returns true on success, false on failure. */
414 static bool
415 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
416            int *first_column, int *last_column, struct string *tmp,
417            struct substring *field)
418 {
419   size_t length_before_separators;
420   struct substring line, p;
421   bool quoted;
422
423   if (dfm_eof (reader))
424     return false;
425   if (ss_is_empty (parser->hard_seps))
426     dfm_expand_tabs (reader);
427   line = p = dfm_get_record (reader);
428
429   /* Skip leading soft separators. */
430   ss_ltrim (&p, parser->soft_seps);
431
432   /* Handle empty or completely consumed lines. */
433   if (ss_is_empty (p))
434     {
435       if (!parser->empty_line_has_field || dfm_columns_past_end (reader) > 0)
436         return false;
437       else
438         {
439           *field = p;
440           *first_column = dfm_column_start (reader);
441           *last_column = *first_column + 1;
442           dfm_forward_columns (reader, 1);
443           return true;
444         }
445     }
446
447   *first_column = dfm_column_start (reader);
448   quoted = ss_find_byte (parser->quotes, ss_first (p)) != SIZE_MAX;
449   if (quoted)
450     {
451       /* Quoted field. */
452       int quote = ss_get_byte (&p);
453       if (!ss_get_until (&p, quote, field))
454         msg (DW, _("Quoted string extends beyond end of line."));
455       if (parser->quote_escape && ss_first (p) == quote)
456         {
457           ds_assign_substring (tmp, *field);
458           while (ss_match_byte (&p, quote))
459             {
460               struct substring ss;
461               ds_put_byte (tmp, quote);
462               if (!ss_get_until (&p, quote, &ss))
463                 msg (DW, _("Quoted string extends beyond end of line."));
464               ds_put_substring (tmp, ss);
465             }
466           *field = ds_ss (tmp);
467         }
468       *last_column = *first_column + (ss_length (line) - ss_length (p));
469     }
470   else
471     {
472       /* Regular field. */
473       ss_get_bytes (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
474       *last_column = *first_column + ss_length (*field);
475     }
476
477   /* Skip trailing soft separator and a single hard separator if present. */
478   length_before_separators = ss_length (p);
479   ss_ltrim (&p, parser->soft_seps);
480   if (!ss_is_empty (p)
481       && ss_find_byte (parser->hard_seps, ss_first (p)) != SIZE_MAX)
482     {
483       ss_advance (&p, 1);
484       ss_ltrim (&p, parser->soft_seps);
485     }
486   if (ss_is_empty (p))
487     dfm_forward_columns (reader, 1);
488   else if (quoted && length_before_separators == ss_length (p))
489     msg (DW, _("Missing delimiter following quoted string."));
490   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
491
492   return true;
493 }
494
495 static void
496 parse_error (const struct dfm_reader *reader, const struct field *field,
497              int first_column, int last_column, char *error)
498 {
499   int line_number = dfm_get_line_number (reader);
500   struct msg_location *location = xmalloc (sizeof *location);
501   *location = (struct msg_location) {
502     .file_name = intern_new (dfm_get_file_name (reader)),
503     .start = { .line = line_number, .column = first_column },
504     .end = { .line = line_number, .column = last_column - 1 },
505   };
506   struct msg *m = xmalloc (sizeof *m);
507   *m = (struct msg) {
508     .category = MSG_C_DATA,
509     .severity = MSG_S_WARNING,
510     .location = location,
511     .text = xasprintf (_("Data for variable %s is not valid as format %s: %s"),
512                        field->name, fmt_name (field->format.type), error),
513   };
514   msg_emit (m);
515
516   free (error);
517 }
518
519 /* Reads a case from READER into C, parsing it according to
520    fixed-format syntax rules in PARSER.
521    Returns true if successful, false at end of file or on I/O error. */
522 static bool
523 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
524              struct ccase *c)
525 {
526   const char *input_encoding = dfm_reader_get_encoding (reader);
527   const char *output_encoding = dict_get_encoding (parser->dict);
528   struct field *f;
529   int row;
530
531   if (dfm_eof (reader))
532     return false;
533
534   f = parser->fields;
535   for (row = 1; row <= parser->records_per_case; row++)
536     {
537       struct substring line;
538
539       if (dfm_eof (reader))
540         {
541           msg (DW, _("Partial case of %d of %d records discarded."),
542                row - 1, parser->records_per_case);
543           return false;
544         }
545       dfm_expand_tabs (reader);
546       line = dfm_get_record (reader);
547
548       for (; f < &parser->fields[parser->n_fields] && f->record == row; f++)
549         {
550           struct substring s = ss_substr (line, f->first_column - 1,
551                                           f->format.w);
552           union value *value = case_data_rw_idx (c, f->case_idx);
553           char *error = data_in (s, input_encoding, f->format.type,
554                                  settings_get_fmt_settings (),
555                                  value, fmt_var_width (&f->format),
556                                  output_encoding);
557
558           if (error == NULL)
559             data_in_imply_decimals (s, input_encoding, f->format.type,
560                                     f->format.d, settings_get_fmt_settings (),
561                                     value);
562           else
563             parse_error (reader, f, f->first_column,
564                          f->first_column + f->format.w, error);
565         }
566
567       dfm_forward_record (reader);
568     }
569
570   return true;
571 }
572
573 /* Reads a case from READER into C, parsing it according to
574    free-format syntax rules in PARSER.
575    Returns true if successful, false at end of file or on I/O error. */
576 static bool
577 parse_delimited_span (const struct data_parser *parser,
578                       struct dfm_reader *reader, struct ccase *c)
579 {
580   const char *output_encoding = dict_get_encoding (parser->dict);
581   struct string tmp = DS_EMPTY_INITIALIZER;
582   struct field *f;
583
584   for (f = parser->fields; f < &parser->fields[parser->n_fields]; f++)
585     {
586       struct substring s;
587       int first_column, last_column;
588       char *error;
589
590       /* Cut out a field and read in a new record if necessary. */
591       while (!cut_field (parser, reader,
592                          &first_column, &last_column, &tmp, &s))
593         {
594           if (!dfm_eof (reader))
595             dfm_forward_record (reader);
596           if (dfm_eof (reader))
597             {
598               if (f > parser->fields)
599                 msg (DW, _("Partial case discarded.  The first variable "
600                            "missing was %s."), f->name);
601               ds_destroy (&tmp);
602               return false;
603             }
604         }
605
606       const char *input_encoding = dfm_reader_get_encoding (reader);
607       error = data_in (s, input_encoding, f->format.type,
608                        settings_get_fmt_settings (),
609                        case_data_rw_idx (c, f->case_idx),
610                        fmt_var_width (&f->format), output_encoding);
611       if (error != NULL)
612         parse_error (reader, f, first_column, last_column, error);
613     }
614   ds_destroy (&tmp);
615   return true;
616 }
617
618 /* Reads a case from READER into C, parsing it according to
619    delimited syntax rules with one case per record in PARSER.
620    Returns true if successful, false at end of file or on I/O error. */
621 static bool
622 parse_delimited_no_span (const struct data_parser *parser,
623                          struct dfm_reader *reader, struct ccase *c)
624 {
625   const char *output_encoding = dict_get_encoding (parser->dict);
626   struct string tmp = DS_EMPTY_INITIALIZER;
627   struct substring s;
628   struct field *f, *end;
629
630   if (dfm_eof (reader))
631     return false;
632
633   end = &parser->fields[parser->n_fields];
634   for (f = parser->fields; f < end; f++)
635     {
636       int first_column, last_column;
637       char *error;
638
639       if (!cut_field (parser, reader, &first_column, &last_column, &tmp, &s))
640         {
641           if (f < end - 1 && settings_get_undefined () && parser->warn_missing_fields)
642             msg (DW, _("Missing value(s) for all variables from %s onward.  "
643                        "These will be filled with the system-missing value "
644                        "or blanks, as appropriate."),
645                  f->name);
646           for (; f < end; f++)
647             value_set_missing (case_data_rw_idx (c, f->case_idx),
648                                fmt_var_width (&f->format));
649           goto exit;
650         }
651
652       const char *input_encoding = dfm_reader_get_encoding (reader);
653       error = data_in (s, input_encoding, f->format.type,
654                        settings_get_fmt_settings (),
655                        case_data_rw_idx (c, f->case_idx),
656                        fmt_var_width (&f->format), output_encoding);
657       if (error != NULL)
658         parse_error (reader, f, first_column, last_column, error);
659     }
660
661   s = dfm_get_record (reader);
662   ss_ltrim (&s, parser->soft_seps);
663   if (!ss_is_empty (s))
664     msg (DW, _("Record ends in data not part of any field."));
665
666 exit:
667   dfm_forward_record (reader);
668   ds_destroy (&tmp);
669   return true;
670 }
671 \f
672 /* Displays a table giving information on fixed-format variable
673    parsing on DATA LIST. */
674 static void
675 dump_fixed_table (const struct data_parser *parser,
676                   const struct file_handle *fh)
677 {
678   /* XXX This should not be preformatted. */
679   char *title = xasprintf (ngettext ("Reading %d record from %s.",
680                                      "Reading %d records from %s.",
681                                      parser->records_per_case),
682                            parser->records_per_case, fh_get_name (fh));
683   struct pivot_table *table = pivot_table_create__ (
684     pivot_value_new_user_text (title, -1), "Fixed Data Records");
685   free (title);
686
687   pivot_dimension_create (
688     table, PIVOT_AXIS_COLUMN, N_("Attributes"),
689     N_("Record"), N_("Columns"), N_("Format"));
690
691   struct pivot_dimension *variables = pivot_dimension_create (
692     table, PIVOT_AXIS_ROW, N_("Variable"));
693   variables->root->show_label = true;
694   for (size_t i = 0; i < parser->n_fields; i++)
695     {
696       struct field *f = &parser->fields[i];
697
698       /* XXX It would be better to have the actual variable here. */
699       int variable_idx = pivot_category_create_leaf (
700         variables->root, pivot_value_new_user_text (f->name, -1));
701
702       pivot_table_put2 (table, 0, variable_idx,
703                         pivot_value_new_integer (f->record));
704
705       int first_column = f->first_column;
706       int last_column = f->first_column + f->format.w - 1;
707       char *columns = xasprintf ("%d-%d", first_column, last_column);
708       pivot_table_put2 (table, 1, variable_idx,
709                         pivot_value_new_user_text (columns, -1));
710       free (columns);
711
712       char str[FMT_STRING_LEN_MAX + 1];
713       pivot_table_put2 (table, 2, variable_idx,
714                         pivot_value_new_user_text (
715                           fmt_to_string (&f->format, str), -1));
716
717     }
718
719   pivot_table_submit (table);
720 }
721
722 /* Displays a table giving information on free-format variable parsing
723    on DATA LIST. */
724 static void
725 dump_delimited_table (const struct data_parser *parser,
726                       const struct file_handle *fh)
727 {
728   struct pivot_table *table = pivot_table_create__ (
729     pivot_value_new_text_format (N_("Reading free-form data from %s."),
730                                  fh_get_name (fh)),
731     "Free-Form Data Records");
732
733   pivot_dimension_create (
734     table, PIVOT_AXIS_COLUMN, N_("Attributes"), N_("Format"));
735
736   struct pivot_dimension *variables = pivot_dimension_create (
737     table, PIVOT_AXIS_ROW, N_("Variable"));
738   variables->root->show_label = true;
739   for (size_t i = 0; i < parser->n_fields; i++)
740     {
741       struct field *f = &parser->fields[i];
742
743       /* XXX It would be better to have the actual variable here. */
744       int variable_idx = pivot_category_create_leaf (
745         variables->root, pivot_value_new_user_text (f->name, -1));
746
747       char str[FMT_STRING_LEN_MAX + 1];
748       pivot_table_put2 (table, 0, variable_idx,
749                         pivot_value_new_user_text (
750                           fmt_to_string (&f->format, str), -1));
751     }
752
753   pivot_table_submit (table);
754 }
755
756 /* Displays a table giving information on how PARSER will read
757    data from FH. */
758 void
759 data_parser_output_description (struct data_parser *parser,
760                                 const struct file_handle *fh)
761 {
762   if (parser->type == DP_FIXED)
763     dump_fixed_table (parser, fh);
764   else
765     dump_delimited_table (parser, fh);
766 }
767 \f
768 /* Data parser input program. */
769 struct data_parser_casereader
770   {
771     struct data_parser *parser; /* Parser. */
772     struct dfm_reader *reader;  /* Data file reader. */
773     struct caseproto *proto;    /* Format of cases. */
774   };
775
776 static const struct casereader_class data_parser_casereader_class;
777
778 /* Replaces DS's active dataset by an input program that reads data
779    from READER according to the rules in PARSER, using DICT as
780    the underlying dictionary.  Ownership of PARSER and READER is
781    transferred to the input program, and ownership of DICT is
782    transferred to the dataset. */
783 void
784 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
785                                struct dfm_reader *reader,
786                                struct dictionary *dict,
787                                struct casereader* (*func)(struct casereader *,
788                                                           const struct dictionary *,
789                                                           void *),
790                                void *ud)
791 {
792   struct data_parser_casereader *r;
793   struct casereader *casereader0;
794   struct casereader *casereader1;
795
796   r = xmalloc (sizeof *r);
797   r->parser = parser;
798   r->reader = reader;
799   r->proto = caseproto_ref (dict_get_proto (dict));
800   casereader0 = casereader_create_sequential (NULL, r->proto,
801                                              CASENUMBER_MAX,
802                                              &data_parser_casereader_class, r);
803
804   if (func)
805     casereader1 = func (casereader0, dict, ud);
806   else
807     casereader1 = casereader0;
808
809   dataset_set_dict (ds, dict);
810   dataset_set_source (ds, casereader1);
811 }
812
813
814 static struct ccase *
815 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
816 {
817   struct data_parser_casereader *r = r_;
818   struct ccase *c = case_create (r->proto);
819   if (data_parser_parse (r->parser, r->reader, c))
820     return c;
821   else
822     {
823       case_unref (c);
824       return NULL;
825     }
826 }
827
828 static void
829 data_parser_casereader_destroy (struct casereader *reader, void *r_)
830 {
831   struct data_parser_casereader *r = r_;
832   if (dfm_reader_error (r->reader))
833     casereader_force_error (reader);
834   dfm_close_reader (r->reader);
835   caseproto_unref (r->proto);
836   data_parser_destroy (r->parser);
837   free (r);
838 }
839
840 static const struct casereader_class data_parser_casereader_class =
841   {
842     data_parser_casereader_read,
843     data_parser_casereader_destroy,
844     NULL,
845     NULL,
846   };