e7896f6bbef1935bfa733bfbaed937035bc91f62
[pspp] / src / language / data-io / data-parser.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013, 2016 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/data-io/data-parser.h"
20
21 #include <stdint.h>
22 #include <stdlib.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/data-in.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/file-handle-def.h"
30 #include "data/settings.h"
31 #include "language/data-io/data-reader.h"
32 #include "libpspp/intern.h"
33 #include "libpspp/message.h"
34 #include "libpspp/str.h"
35 #include "output/pivot-table.h"
36
37 #include "gl/xalloc.h"
38
39 #include "gettext.h"
40 #define N_(msgid) msgid
41 #define _(msgid) gettext (msgid)
42
43 /* Data parser for textual data like that read by DATA LIST. */
44 struct data_parser
45   {
46     enum data_parser_type type; /* Type of data to parse. */
47     int skip_records;           /* Records to skip before first real data. */
48
49     struct field *fields;       /* Fields to parse. */
50     size_t n_fields;            /* Number of fields. */
51     size_t field_allocated;     /* Number of fields spaced allocated for. */
52
53     /* DP_DELIMITED parsers only. */
54     bool span;                  /* May cases span multiple records? */
55     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
56     bool warn_missing_fields;   /* Should missing fields be considered errors? */
57     struct substring quotes;    /* Characters that can quote separators. */
58     bool quote_escape;          /* Doubled quote acts as escape? */
59     struct substring soft_seps; /* Two soft separators act like just one. */
60     struct substring hard_seps; /* Two hard separators yield empty fields. */
61     struct string any_sep;      /* Concatenation of soft_seps and hard_seps. */
62
63     /* DP_FIXED parsers only. */
64     int records_per_case;       /* Number of records in each case. */
65   };
66
67 /* How to parse one variable. */
68 struct field
69   {
70     struct fmt_spec format;     /* Input format of this field. */
71     int case_idx;               /* First value in case. */
72     char *name;                 /* Var name for error messages and tables. */
73
74     /* DP_FIXED only. */
75     int record;                 /* Record number (1-based). */
76     int first_column;           /* First column in record (1-based). */
77   };
78
79 static void set_any_sep (struct data_parser *parser);
80
81 /* Creates and returns a new data parser. */
82 struct data_parser *
83 data_parser_create (void)
84 {
85   struct data_parser *parser = xmalloc (sizeof *parser);
86
87   parser->type = DP_FIXED;
88   parser->skip_records = 0;
89
90   parser->fields = NULL;
91   parser->n_fields = 0;
92   parser->field_allocated = 0;
93
94   parser->span = true;
95   parser->empty_line_has_field = false;
96   parser->warn_missing_fields = true;
97   ss_alloc_substring (&parser->quotes, ss_cstr ("\"'"));
98   parser->quote_escape = false;
99   ss_alloc_substring (&parser->soft_seps, ss_cstr (CC_SPACES));
100   ss_alloc_substring (&parser->hard_seps, ss_cstr (","));
101   ds_init_empty (&parser->any_sep);
102   set_any_sep (parser);
103
104   parser->records_per_case = 0;
105
106   return parser;
107 }
108
109 /* Destroys PARSER. */
110 void
111 data_parser_destroy (struct data_parser *parser)
112 {
113   if (parser != NULL)
114     {
115       size_t i;
116
117       for (i = 0; i < parser->n_fields; i++)
118         free (parser->fields[i].name);
119       free (parser->fields);
120       ss_dealloc (&parser->quotes);
121       ss_dealloc (&parser->soft_seps);
122       ss_dealloc (&parser->hard_seps);
123       ds_destroy (&parser->any_sep);
124       free (parser);
125     }
126 }
127
128 /* Returns the type of PARSER (either DP_DELIMITED or DP_FIXED). */
129 enum data_parser_type
130 data_parser_get_type (const struct data_parser *parser)
131 {
132   return parser->type;
133 }
134
135 /* Sets the type of PARSER to TYPE (either DP_DELIMITED or
136    DP_FIXED). */
137 void
138 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
139 {
140   assert (parser->n_fields == 0);
141   assert (type == DP_FIXED || type == DP_DELIMITED);
142   parser->type = type;
143 }
144
145 /* Configures PARSER to skip the specified number of
146    INITIAL_RECORDS_TO_SKIP before parsing any data.  By default,
147    no records are skipped. */
148 void
149 data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
150 {
151   assert (initial_records_to_skip >= 0);
152   parser->skip_records = initial_records_to_skip;
153 }
154
155 /* Returns true if PARSER is configured to allow cases to span
156    multiple records. */
157 bool
158 data_parser_get_span (const struct data_parser *parser)
159 {
160   return parser->span;
161 }
162
163 /* If MAY_CASES_SPAN_RECORDS is true, configures PARSER to allow
164    a single case to span multiple records and multiple cases to
165    occupy a single record.  If MAY_CASES_SPAN_RECORDS is false,
166    configures PARSER to require each record to contain exactly
167    one case.
168
169    This setting affects parsing of DP_DELIMITED files only. */
170 void
171 data_parser_set_span (struct data_parser *parser, bool may_cases_span_records)
172 {
173   parser->span = may_cases_span_records;
174 }
175
176 /* If EMPTY_LINE_HAS_FIELD is true, configures PARSER to parse an
177    empty line as an empty field and to treat a hard delimiter
178    followed by end-of-line as an empty field.  If
179    EMPTY_LINE_HAS_FIELD is false, PARSER will skip empty lines
180    and hard delimiters at the end of lines without emitting empty
181    fields.
182
183    This setting affects parsing of DP_DELIMITED files only. */
184 void
185 data_parser_set_empty_line_has_field (struct data_parser *parser,
186                                       bool empty_line_has_field)
187 {
188   parser->empty_line_has_field = empty_line_has_field;
189 }
190
191
192 /* If WARN_MISSING_FIELDS is true, configures PARSER to emit a warning
193    and cause an error condition when a missing field is encountered.
194    If  WARN_MISSING_FIELDS is false, PARSER will silently fill such
195    fields with the system missing value.
196
197    This setting affects parsing of DP_DELIMITED files only. */
198 void
199 data_parser_set_warn_missing_fields (struct data_parser *parser,
200                                      bool warn_missing_fields)
201 {
202   parser->warn_missing_fields = warn_missing_fields;
203 }
204
205
206 /* Sets the characters that may be used for quoting field
207    contents to QUOTES.  If QUOTES is empty, quoting will be
208    disabled.
209
210    The caller retains ownership of QUOTES.
211
212    This setting affects parsing of DP_DELIMITED files only. */
213 void
214 data_parser_set_quotes (struct data_parser *parser, struct substring quotes)
215 {
216   ss_dealloc (&parser->quotes);
217   ss_alloc_substring (&parser->quotes, quotes);
218 }
219
220 /* If ESCAPE is false (the default setting), a character used for
221    quoting cannot itself be embedded within a quoted field.  If
222    ESCAPE is true, then a quote character can be embedded within
223    a quoted field by doubling it.
224
225    This setting affects parsing of DP_DELIMITED files only, and
226    only when at least one quote character has been set (with
227    data_parser_set_quotes). */
228 void
229 data_parser_set_quote_escape (struct data_parser *parser, bool escape)
230 {
231   parser->quote_escape = escape;
232 }
233
234 /* Sets PARSER's soft delimiters to DELIMITERS.  Soft delimiters
235    separate fields, but consecutive soft delimiters do not yield
236    empty fields.  (Ordinarily, only white space characters are
237    appropriate soft delimiters.)
238
239    The caller retains ownership of DELIMITERS.
240
241    This setting affects parsing of DP_DELIMITED files only. */
242 void
243 data_parser_set_soft_delimiters (struct data_parser *parser,
244                                  struct substring delimiters)
245 {
246   ss_dealloc (&parser->soft_seps);
247   ss_alloc_substring (&parser->soft_seps, delimiters);
248   set_any_sep (parser);
249 }
250
251 /* Sets PARSER's hard delimiters to DELIMITERS.  Hard delimiters
252    separate fields.  A consecutive pair of hard delimiters yield
253    an empty field.
254
255    The caller retains ownership of DELIMITERS.
256
257    This setting affects parsing of DP_DELIMITED files only. */
258 void
259 data_parser_set_hard_delimiters (struct data_parser *parser,
260                                  struct substring delimiters)
261 {
262   ss_dealloc (&parser->hard_seps);
263   ss_alloc_substring (&parser->hard_seps, delimiters);
264   set_any_sep (parser);
265 }
266
267 /* Returns the number of records per case. */
268 int
269 data_parser_get_records (const struct data_parser *parser)
270 {
271   return parser->records_per_case;
272 }
273
274 /* Sets the number of records per case to RECORDS_PER_CASE.
275
276    This setting affects parsing of DP_FIXED files only. */
277 void
278 data_parser_set_records (struct data_parser *parser, int records_per_case)
279 {
280   assert (records_per_case >= 0);
281   assert (records_per_case >= parser->records_per_case);
282   parser->records_per_case = records_per_case;
283 }
284
285 static void
286 add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
287            const char *name, int record, int first_column)
288 {
289   struct field *field;
290
291   if (p->n_fields == p->field_allocated)
292     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
293   field = &p->fields[p->n_fields++];
294   field->format = *format;
295   field->case_idx = case_idx;
296   field->name = xstrdup (name);
297   field->record = record;
298   field->first_column = first_column;
299 }
300
301 /* Adds a delimited field to the field parsed by PARSER, which
302    must be configured as a DP_DELIMITED parser.  The field is
303    parsed as input format FORMAT.  Its data will be stored into case
304    index CASE_INDEX.  Errors in input data will be reported
305    against variable NAME. */
306 void
307 data_parser_add_delimited_field (struct data_parser *parser,
308                                  const struct fmt_spec *format, int case_idx,
309                                  const char *name)
310 {
311   assert (parser->type == DP_DELIMITED);
312   add_field (parser, format, case_idx, name, 0, 0);
313 }
314
315 /* Adds a fixed field to the field parsed by PARSER, which
316    must be configured as a DP_FIXED parser.  The field is
317    parsed as input format FORMAT.  Its data will be stored into case
318    index CASE_INDEX.  Errors in input data will be reported
319    against variable NAME.  The field will be drawn from the
320    FORMAT->w columns in 1-based RECORD starting at 1-based
321    column FIRST_COLUMN.
322
323    RECORD must be at least as great as that of any field already
324    added; that is, fields must be added in increasing order of
325    record number.  If RECORD is greater than the current number
326    of records per case, the number of records per case are
327    increased as needed.  */
328 void
329 data_parser_add_fixed_field (struct data_parser *parser,
330                              const struct fmt_spec *format, int case_idx,
331                              const char *name,
332                              int record, int first_column)
333 {
334   assert (parser->type == DP_FIXED);
335   assert (parser->n_fields == 0
336           || record >= parser->fields[parser->n_fields - 1].record);
337   if (record > parser->records_per_case)
338     parser->records_per_case = record;
339   add_field (parser, format, case_idx, name, record, first_column);
340 }
341
342 /* Returns true if any fields have been added to PARSER, false
343    otherwise. */
344 bool
345 data_parser_any_fields (const struct data_parser *parser)
346 {
347   return parser->n_fields > 0;
348 }
349
350 static void
351 set_any_sep (struct data_parser *parser)
352 {
353   ds_assign_substring (&parser->any_sep, parser->soft_seps);
354   ds_put_substring (&parser->any_sep, parser->hard_seps);
355 }
356 \f
357 static bool parse_delimited_span (const struct data_parser *,
358                                   struct dfm_reader *,
359                                   struct dictionary *, struct ccase *);
360 static bool parse_delimited_no_span (const struct data_parser *,
361                                      struct dfm_reader *,
362                                      struct dictionary *, struct ccase *);
363 static bool parse_fixed (const struct data_parser *, struct dfm_reader *,
364                          struct dictionary *, struct ccase *);
365
366 /* Reads a case from DFM into C, which matches dictionary DICT, parsing it with
367    PARSER.  Returns true if successful, false at end of file or on I/O error.
368
369    Case C must not be shared. */
370 bool
371 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
372                    struct dictionary *dict, struct ccase *c)
373 {
374   bool retval;
375
376   assert (!case_is_shared (c));
377   assert (data_parser_any_fields (parser));
378
379   /* Skip the requested number of records before reading the
380      first case. */
381   for (; parser->skip_records > 0; parser->skip_records--)
382     {
383       if (dfm_eof (reader))
384         return false;
385       dfm_forward_record (reader);
386     }
387
388   /* Limit cases. */
389   if (parser->type == DP_DELIMITED)
390     {
391       if (parser->span)
392         retval = parse_delimited_span (parser, reader, dict, c);
393       else
394         retval = parse_delimited_no_span (parser, reader, dict, c);
395     }
396   else
397     retval = parse_fixed (parser, reader, dict, c);
398
399   return retval;
400 }
401
402 /* Extracts a delimited field from the current position in the
403    current record according to PARSER, reading data from READER.
404
405    *FIELD is set to the field content.  The caller must not or
406    destroy this constant string.
407
408    Sets *FIRST_COLUMN to the 1-based column number of the start of
409    the extracted field, and *LAST_COLUMN to the end of the extracted
410    field.
411
412    Returns true on success, false on failure. */
413 static bool
414 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
415            int *first_column, int *last_column, struct string *tmp,
416            struct substring *field)
417 {
418   size_t length_before_separators;
419   struct substring line, p;
420   bool quoted;
421
422   if (dfm_eof (reader))
423     return false;
424   if (ss_is_empty (parser->hard_seps))
425     dfm_expand_tabs (reader);
426   line = p = dfm_get_record (reader);
427
428   /* Skip leading soft separators. */
429   ss_ltrim (&p, parser->soft_seps);
430
431   /* Handle empty or completely consumed lines. */
432   if (ss_is_empty (p))
433     {
434       if (!parser->empty_line_has_field || dfm_columns_past_end (reader) > 0)
435         return false;
436       else
437         {
438           *field = p;
439           *first_column = dfm_column_start (reader);
440           *last_column = *first_column + 1;
441           dfm_forward_columns (reader, 1);
442           return true;
443         }
444     }
445
446   *first_column = dfm_column_start (reader);
447   quoted = ss_find_byte (parser->quotes, ss_first (p)) != SIZE_MAX;
448   if (quoted)
449     {
450       /* Quoted field. */
451       int quote = ss_get_byte (&p);
452       if (!ss_get_until (&p, quote, field))
453         msg (DW, _("Quoted string extends beyond end of line."));
454       if (parser->quote_escape && ss_first (p) == quote)
455         {
456           ds_assign_substring (tmp, *field);
457           while (ss_match_byte (&p, quote))
458             {
459               struct substring ss;
460               ds_put_byte (tmp, quote);
461               if (!ss_get_until (&p, quote, &ss))
462                 msg (DW, _("Quoted string extends beyond end of line."));
463               ds_put_substring (tmp, ss);
464             }
465           *field = ds_ss (tmp);
466         }
467       *last_column = *first_column + (ss_length (line) - ss_length (p));
468     }
469   else
470     {
471       /* Regular field. */
472       ss_get_bytes (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
473       *last_column = *first_column + ss_length (*field);
474     }
475
476   /* Skip trailing soft separator and a single hard separator if present. */
477   length_before_separators = ss_length (p);
478   ss_ltrim (&p, parser->soft_seps);
479   if (!ss_is_empty (p)
480       && ss_find_byte (parser->hard_seps, ss_first (p)) != SIZE_MAX)
481     {
482       ss_advance (&p, 1);
483       ss_ltrim (&p, parser->soft_seps);
484     }
485   if (ss_is_empty (p))
486     dfm_forward_columns (reader, 1);
487   else if (quoted && length_before_separators == ss_length (p))
488     msg (DW, _("Missing delimiter following quoted string."));
489   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
490
491   return true;
492 }
493
494 static void
495 parse_error (const struct dfm_reader *reader, const struct field *field,
496              int first_column, int last_column, char *error)
497 {
498   int line_number = dfm_get_line_number (reader);
499   struct msg_location *location = xmalloc (sizeof *location);
500   *location = (struct msg_location) {
501     .file_name = intern_new (dfm_get_file_name (reader)),
502     .start = { .line = line_number, .column = first_column },
503     .end = { .line = line_number, .column = last_column - 1 },
504   };
505   struct msg *m = xmalloc (sizeof *m);
506   *m = (struct msg) {
507     .category = MSG_C_DATA,
508     .severity = MSG_S_WARNING,
509     .location = location,
510     .text = xasprintf (_("Data for variable %s is not valid as format %s: %s"),
511                        field->name, fmt_name (field->format.type), error),
512   };
513   msg_emit (m);
514
515   free (error);
516 }
517
518 /* Reads a case from READER into C, which matches DICT, parsing it according to
519    fixed-format syntax rules in PARSER.  Returns true if successful, false at
520    end of file or on I/O error. */
521 static bool
522 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
523              struct dictionary *dict, struct ccase *c)
524 {
525   const char *input_encoding = dfm_reader_get_encoding (reader);
526   const char *output_encoding = dict_get_encoding (dict);
527   struct field *f;
528   int row;
529
530   if (dfm_eof (reader))
531     return false;
532
533   f = parser->fields;
534   for (row = 1; row <= parser->records_per_case; row++)
535     {
536       struct substring line;
537
538       if (dfm_eof (reader))
539         {
540           msg (DW, _("Partial case of %d of %d records discarded."),
541                row - 1, parser->records_per_case);
542           return false;
543         }
544       dfm_expand_tabs (reader);
545       line = dfm_get_record (reader);
546
547       for (; f < &parser->fields[parser->n_fields] && f->record == row; f++)
548         {
549           struct substring s = ss_substr (line, f->first_column - 1,
550                                           f->format.w);
551           union value *value = case_data_rw_idx (c, f->case_idx);
552           char *error = data_in (s, input_encoding, f->format.type,
553                                  settings_get_fmt_settings (),
554                                  value, fmt_var_width (&f->format),
555                                  output_encoding);
556
557           if (error == NULL)
558             data_in_imply_decimals (s, input_encoding, f->format.type,
559                                     f->format.d, settings_get_fmt_settings (),
560                                     value);
561           else
562             parse_error (reader, f, f->first_column,
563                          f->first_column + f->format.w, error);
564         }
565
566       dfm_forward_record (reader);
567     }
568
569   return true;
570 }
571
572 /* Reads a case from READER into C, which matches dictionary DICT, parsing it
573    according to free-format syntax rules in PARSER.  Returns true if
574    successful, false at end of file or on I/O error. */
575 static bool
576 parse_delimited_span (const struct data_parser *parser,
577                       struct dfm_reader *reader,
578                       struct dictionary *dict, struct ccase *c)
579 {
580   const char *output_encoding = dict_get_encoding (dict);
581   struct string tmp = DS_EMPTY_INITIALIZER;
582   struct field *f;
583
584   for (f = parser->fields; f < &parser->fields[parser->n_fields]; f++)
585     {
586       struct substring s;
587       int first_column, last_column;
588       char *error;
589
590       /* Cut out a field and read in a new record if necessary. */
591       while (!cut_field (parser, reader,
592                          &first_column, &last_column, &tmp, &s))
593         {
594           if (!dfm_eof (reader))
595             dfm_forward_record (reader);
596           if (dfm_eof (reader))
597             {
598               if (f > parser->fields)
599                 msg (DW, _("Partial case discarded.  The first variable "
600                            "missing was %s."), f->name);
601               ds_destroy (&tmp);
602               return false;
603             }
604         }
605
606       const char *input_encoding = dfm_reader_get_encoding (reader);
607       error = data_in (s, input_encoding, f->format.type,
608                        settings_get_fmt_settings (),
609                        case_data_rw_idx (c, f->case_idx),
610                        fmt_var_width (&f->format), output_encoding);
611       if (error != NULL)
612         parse_error (reader, f, first_column, last_column, error);
613     }
614   ds_destroy (&tmp);
615   return true;
616 }
617
618 /* Reads a case from READER into C, which matches dictionary DICT, parsing it
619    according to delimited syntax rules with one case per record in PARSER.
620    Returns true if successful, false at end of file or on I/O error. */
621 static bool
622 parse_delimited_no_span (const struct data_parser *parser,
623                          struct dfm_reader *reader,
624                          struct dictionary *dict, struct ccase *c)
625 {
626   const char *output_encoding = dict_get_encoding (dict);
627   struct string tmp = DS_EMPTY_INITIALIZER;
628   struct substring s;
629   struct field *f, *end;
630
631   if (dfm_eof (reader))
632     return false;
633
634   end = &parser->fields[parser->n_fields];
635   for (f = parser->fields; f < end; f++)
636     {
637       int first_column, last_column;
638       char *error;
639
640       if (!cut_field (parser, reader, &first_column, &last_column, &tmp, &s))
641         {
642           if (f < end - 1 && settings_get_undefined () && parser->warn_missing_fields)
643             msg (DW, _("Missing value(s) for all variables from %s onward.  "
644                        "These will be filled with the system-missing value "
645                        "or blanks, as appropriate."),
646                  f->name);
647           for (; f < end; f++)
648             value_set_missing (case_data_rw_idx (c, f->case_idx),
649                                fmt_var_width (&f->format));
650           goto exit;
651         }
652
653       const char *input_encoding = dfm_reader_get_encoding (reader);
654       error = data_in (s, input_encoding, f->format.type,
655                        settings_get_fmt_settings (),
656                        case_data_rw_idx (c, f->case_idx),
657                        fmt_var_width (&f->format), output_encoding);
658       if (error != NULL)
659         parse_error (reader, f, first_column, last_column, error);
660     }
661
662   s = dfm_get_record (reader);
663   ss_ltrim (&s, parser->soft_seps);
664   if (!ss_is_empty (s))
665     msg (DW, _("Record ends in data not part of any field."));
666
667 exit:
668   dfm_forward_record (reader);
669   ds_destroy (&tmp);
670   return true;
671 }
672 \f
673 /* Displays a table giving information on fixed-format variable
674    parsing on DATA LIST. */
675 static void
676 dump_fixed_table (const struct data_parser *parser,
677                   const struct file_handle *fh)
678 {
679   /* XXX This should not be preformatted. */
680   char *title = xasprintf (ngettext ("Reading %d record from %s.",
681                                      "Reading %d records from %s.",
682                                      parser->records_per_case),
683                            parser->records_per_case, fh_get_name (fh));
684   struct pivot_table *table = pivot_table_create__ (
685     pivot_value_new_user_text (title, -1), "Fixed Data Records");
686   free (title);
687
688   pivot_dimension_create (
689     table, PIVOT_AXIS_COLUMN, N_("Attributes"),
690     N_("Record"), N_("Columns"), N_("Format"));
691
692   struct pivot_dimension *variables = pivot_dimension_create (
693     table, PIVOT_AXIS_ROW, N_("Variable"));
694   variables->root->show_label = true;
695   for (size_t i = 0; i < parser->n_fields; i++)
696     {
697       struct field *f = &parser->fields[i];
698
699       /* XXX It would be better to have the actual variable here. */
700       int variable_idx = pivot_category_create_leaf (
701         variables->root, pivot_value_new_user_text (f->name, -1));
702
703       pivot_table_put2 (table, 0, variable_idx,
704                         pivot_value_new_integer (f->record));
705
706       int first_column = f->first_column;
707       int last_column = f->first_column + f->format.w - 1;
708       char *columns = xasprintf ("%d-%d", first_column, last_column);
709       pivot_table_put2 (table, 1, variable_idx,
710                         pivot_value_new_user_text (columns, -1));
711       free (columns);
712
713       char str[FMT_STRING_LEN_MAX + 1];
714       pivot_table_put2 (table, 2, variable_idx,
715                         pivot_value_new_user_text (
716                           fmt_to_string (&f->format, str), -1));
717
718     }
719
720   pivot_table_submit (table);
721 }
722
723 /* Displays a table giving information on free-format variable parsing
724    on DATA LIST. */
725 static void
726 dump_delimited_table (const struct data_parser *parser,
727                       const struct file_handle *fh)
728 {
729   struct pivot_table *table = pivot_table_create__ (
730     pivot_value_new_text_format (N_("Reading free-form data from %s."),
731                                  fh_get_name (fh)),
732     "Free-Form Data Records");
733
734   pivot_dimension_create (
735     table, PIVOT_AXIS_COLUMN, N_("Attributes"), N_("Format"));
736
737   struct pivot_dimension *variables = pivot_dimension_create (
738     table, PIVOT_AXIS_ROW, N_("Variable"));
739   variables->root->show_label = true;
740   for (size_t i = 0; i < parser->n_fields; i++)
741     {
742       struct field *f = &parser->fields[i];
743
744       /* XXX It would be better to have the actual variable here. */
745       int variable_idx = pivot_category_create_leaf (
746         variables->root, pivot_value_new_user_text (f->name, -1));
747
748       char str[FMT_STRING_LEN_MAX + 1];
749       pivot_table_put2 (table, 0, variable_idx,
750                         pivot_value_new_user_text (
751                           fmt_to_string (&f->format, str), -1));
752     }
753
754   pivot_table_submit (table);
755 }
756
757 /* Displays a table giving information on how PARSER will read
758    data from FH. */
759 void
760 data_parser_output_description (struct data_parser *parser,
761                                 const struct file_handle *fh)
762 {
763   if (parser->type == DP_FIXED)
764     dump_fixed_table (parser, fh);
765   else
766     dump_delimited_table (parser, fh);
767 }
768 \f
769 /* Data parser input program. */
770 struct data_parser_casereader
771   {
772     struct data_parser *parser; /* Parser. */
773     struct dictionary *dict;    /* Dictionary. */
774     struct dfm_reader *reader;  /* Data file reader. */
775     struct caseproto *proto;    /* Format of cases. */
776   };
777
778 static const struct casereader_class data_parser_casereader_class;
779
780 /* Replaces DS's active dataset by an input program that reads data
781    from READER according to the rules in PARSER, using DICT as
782    the underlying dictionary.  Ownership of PARSER and READER is
783    transferred to the input program, and ownership of DICT is
784    transferred to the dataset. */
785 void
786 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
787                                struct dfm_reader *reader,
788                               struct dictionary *dict,
789                                struct casereader* (*func)(struct casereader *,
790                                                           const struct dictionary *,
791                                                           void *),
792                                void *ud)
793 {
794   struct data_parser_casereader *r;
795   struct casereader *casereader0;
796   struct casereader *casereader1;
797
798   r = xmalloc (sizeof *r);
799   r->parser = parser;
800   r->dict = dict_ref (dict);
801   r->reader = reader;
802   r->proto = caseproto_ref (dict_get_proto (dict));
803   casereader0 = casereader_create_sequential (NULL, r->proto,
804                                              CASENUMBER_MAX,
805                                              &data_parser_casereader_class, r);
806
807   if (func)
808     casereader1 = func (casereader0, dict, ud);
809   else
810     casereader1 = casereader0;
811
812   dataset_set_dict (ds, dict);
813   dataset_set_source (ds, casereader1);
814 }
815
816
817 static struct ccase *
818 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
819 {
820   struct data_parser_casereader *r = r_;
821   struct ccase *c = case_create (r->proto);
822   if (data_parser_parse (r->parser, r->reader, r->dict, c))
823     return c;
824   else
825     {
826       case_unref (c);
827       return NULL;
828     }
829 }
830
831 static void
832 data_parser_casereader_destroy (struct casereader *reader, void *r_)
833 {
834   struct data_parser_casereader *r = r_;
835   if (dfm_reader_error (r->reader))
836     casereader_force_error (reader);
837   dfm_close_reader (r->reader);
838   caseproto_unref (r->proto);
839   dict_unref (r->dict);
840   data_parser_destroy (r->parser);
841   free (r);
842 }
843
844 static const struct casereader_class data_parser_casereader_class =
845   {
846     data_parser_casereader_read,
847     data_parser_casereader_destroy,
848     NULL,
849     NULL,
850   };