Remove unneeded #includes.
[pspp] / src / language / data-io / data-parser.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013, 2016 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/data-io/data-parser.h"
20
21 #include <stdint.h>
22 #include <stdlib.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/data-in.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/file-handle-def.h"
30 #include "data/settings.h"
31 #include "language/data-io/data-reader.h"
32 #include "libpspp/intern.h"
33 #include "libpspp/message.h"
34 #include "libpspp/str.h"
35 #include "libpspp/string-array.h"
36 #include "output/pivot-table.h"
37
38 #include "gl/xalloc.h"
39
40 #include "gettext.h"
41 #define N_(msgid) msgid
42 #define _(msgid) gettext (msgid)
43
44 /* Data parser for textual data like that read by DATA LIST. */
45 struct data_parser
46   {
47     enum data_parser_type type; /* Type of data to parse. */
48     int skip_records;           /* Records to skip before first real data. */
49
50     struct field *fields;       /* Fields to parse. */
51     size_t n_fields;            /* Number of fields. */
52     size_t field_allocated;     /* Number of fields spaced allocated for. */
53
54     /* DP_DELIMITED parsers only. */
55     bool span;                  /* May cases span multiple records? */
56     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
57     bool warn_missing_fields;   /* Should missing fields be considered errors? */
58     struct substring quotes;    /* Characters that can quote separators. */
59     bool quote_escape;          /* Doubled quote acts as escape? */
60     struct substring soft_seps; /* Two soft separators act like just one. */
61     struct substring hard_seps; /* Two hard separators yield empty fields. */
62     struct string any_sep;      /* Concatenation of soft_seps and hard_seps. */
63
64     /* DP_FIXED parsers only. */
65     int records_per_case;       /* Number of records in each case. */
66   };
67
68 /* How to parse one variable. */
69 struct field
70   {
71     struct fmt_spec format;     /* Input format of this field. */
72     int case_idx;               /* First value in case. */
73     char *name;                 /* Var name for error messages and tables. */
74
75     /* DP_FIXED only. */
76     int record;                 /* Record number (1-based). */
77     int first_column;           /* First column in record (1-based). */
78   };
79
80 static void set_any_sep (struct data_parser *parser);
81
82 /* Creates and returns a new data parser. */
83 struct data_parser *
84 data_parser_create (void)
85 {
86   struct data_parser *parser = xmalloc (sizeof *parser);
87   *parser = (struct data_parser) {
88     .type = DP_FIXED,
89     .span = true,
90     .warn_missing_fields = true,
91     .quotes = ss_clone (ss_cstr ("\"'")),
92     .soft_seps = ss_clone (ss_cstr (CC_SPACES)),
93     .hard_seps = ss_clone (ss_cstr (",")),
94   };
95   set_any_sep (parser);
96
97   return parser;
98 }
99
100 /* Destroys PARSER. */
101 void
102 data_parser_destroy (struct data_parser *parser)
103 {
104   if (parser != NULL)
105     {
106       size_t i;
107
108       for (i = 0; i < parser->n_fields; i++)
109         free (parser->fields[i].name);
110       free (parser->fields);
111       ss_dealloc (&parser->quotes);
112       ss_dealloc (&parser->soft_seps);
113       ss_dealloc (&parser->hard_seps);
114       ds_destroy (&parser->any_sep);
115       free (parser);
116     }
117 }
118
119 /* Returns the type of PARSER (either DP_DELIMITED or DP_FIXED). */
120 enum data_parser_type
121 data_parser_get_type (const struct data_parser *parser)
122 {
123   return parser->type;
124 }
125
126 /* Sets the type of PARSER to TYPE (either DP_DELIMITED or
127    DP_FIXED). */
128 void
129 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
130 {
131   assert (parser->n_fields == 0);
132   assert (type == DP_FIXED || type == DP_DELIMITED);
133   parser->type = type;
134 }
135
136 /* Configures PARSER to skip the specified number of
137    INITIAL_RECORDS_TO_SKIP before parsing any data.  By default,
138    no records are skipped. */
139 void
140 data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
141 {
142   assert (initial_records_to_skip >= 0);
143   parser->skip_records = initial_records_to_skip;
144 }
145
146 /* Returns true if PARSER is configured to allow cases to span
147    multiple records. */
148 bool
149 data_parser_get_span (const struct data_parser *parser)
150 {
151   return parser->span;
152 }
153
154 /* If MAY_CASES_SPAN_RECORDS is true, configures PARSER to allow
155    a single case to span multiple records and multiple cases to
156    occupy a single record.  If MAY_CASES_SPAN_RECORDS is false,
157    configures PARSER to require each record to contain exactly
158    one case.
159
160    This setting affects parsing of DP_DELIMITED files only. */
161 void
162 data_parser_set_span (struct data_parser *parser, bool may_cases_span_records)
163 {
164   parser->span = may_cases_span_records;
165 }
166
167 /* If EMPTY_LINE_HAS_FIELD is true, configures PARSER to parse an
168    empty line as an empty field and to treat a hard delimiter
169    followed by end-of-line as an empty field.  If
170    EMPTY_LINE_HAS_FIELD is false, PARSER will skip empty lines
171    and hard delimiters at the end of lines without emitting empty
172    fields.
173
174    This setting affects parsing of DP_DELIMITED files only. */
175 void
176 data_parser_set_empty_line_has_field (struct data_parser *parser,
177                                       bool empty_line_has_field)
178 {
179   parser->empty_line_has_field = empty_line_has_field;
180 }
181
182
183 /* If WARN_MISSING_FIELDS is true, configures PARSER to emit a warning
184    and cause an error condition when a missing field is encountered.
185    If  WARN_MISSING_FIELDS is false, PARSER will silently fill such
186    fields with the system missing value.
187
188    This setting affects parsing of DP_DELIMITED files only. */
189 void
190 data_parser_set_warn_missing_fields (struct data_parser *parser,
191                                      bool warn_missing_fields)
192 {
193   parser->warn_missing_fields = warn_missing_fields;
194 }
195
196
197 /* Sets the characters that may be used for quoting field
198    contents to QUOTES.  If QUOTES is empty, quoting will be
199    disabled.
200
201    The caller retains ownership of QUOTES.
202
203    This setting affects parsing of DP_DELIMITED files only. */
204 void
205 data_parser_set_quotes (struct data_parser *parser, struct substring quotes)
206 {
207   ss_dealloc (&parser->quotes);
208   parser->quotes = ss_clone (quotes);
209 }
210
211 /* If ESCAPE is false (the default setting), a character used for
212    quoting cannot itself be embedded within a quoted field.  If
213    ESCAPE is true, then a quote character can be embedded within
214    a quoted field by doubling it.
215
216    This setting affects parsing of DP_DELIMITED files only, and
217    only when at least one quote character has been set (with
218    data_parser_set_quotes). */
219 void
220 data_parser_set_quote_escape (struct data_parser *parser, bool escape)
221 {
222   parser->quote_escape = escape;
223 }
224
225 /* Sets PARSER's soft delimiters to DELIMITERS.  Soft delimiters
226    separate fields, but consecutive soft delimiters do not yield
227    empty fields.  (Ordinarily, only white space characters are
228    appropriate soft delimiters.)
229
230    The caller retains ownership of DELIMITERS.
231
232    This setting affects parsing of DP_DELIMITED files only. */
233 void
234 data_parser_set_soft_delimiters (struct data_parser *parser,
235                                  struct substring delimiters)
236 {
237   ss_dealloc (&parser->soft_seps);
238   parser->soft_seps = ss_clone (delimiters);
239   set_any_sep (parser);
240 }
241
242 /* Sets PARSER's hard delimiters to DELIMITERS.  Hard delimiters
243    separate fields.  A consecutive pair of hard delimiters yield
244    an empty field.
245
246    The caller retains ownership of DELIMITERS.
247
248    This setting affects parsing of DP_DELIMITED files only. */
249 void
250 data_parser_set_hard_delimiters (struct data_parser *parser,
251                                  struct substring delimiters)
252 {
253   ss_dealloc (&parser->hard_seps);
254   parser->hard_seps = ss_clone (delimiters);
255   set_any_sep (parser);
256 }
257
258 /* Returns the number of records per case. */
259 int
260 data_parser_get_records (const struct data_parser *parser)
261 {
262   return parser->records_per_case;
263 }
264
265 /* Sets the number of records per case to RECORDS_PER_CASE.
266
267    This setting affects parsing of DP_FIXED files only. */
268 void
269 data_parser_set_records (struct data_parser *parser, int records_per_case)
270 {
271   assert (records_per_case >= 0);
272   assert (records_per_case >= parser->records_per_case);
273   parser->records_per_case = records_per_case;
274 }
275
276 static void
277 add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
278            const char *name, int record, int first_column)
279 {
280   struct field *field;
281
282   if (p->n_fields == p->field_allocated)
283     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
284   field = &p->fields[p->n_fields++];
285   field->format = *format;
286   field->case_idx = case_idx;
287   field->name = xstrdup (name);
288   field->record = record;
289   field->first_column = first_column;
290 }
291
292 /* Adds a delimited field to the field parsed by PARSER, which
293    must be configured as a DP_DELIMITED parser.  The field is
294    parsed as input format FORMAT.  Its data will be stored into case
295    index CASE_INDEX.  Errors in input data will be reported
296    against variable NAME. */
297 void
298 data_parser_add_delimited_field (struct data_parser *parser,
299                                  const struct fmt_spec *format, int case_idx,
300                                  const char *name)
301 {
302   assert (parser->type == DP_DELIMITED);
303   add_field (parser, format, case_idx, name, 0, 0);
304 }
305
306 /* Adds a fixed field to the field parsed by PARSER, which
307    must be configured as a DP_FIXED parser.  The field is
308    parsed as input format FORMAT.  Its data will be stored into case
309    index CASE_INDEX.  Errors in input data will be reported
310    against variable NAME.  The field will be drawn from the
311    FORMAT->w columns in 1-based RECORD starting at 1-based
312    column FIRST_COLUMN.
313
314    RECORD must be at least as great as that of any field already
315    added; that is, fields must be added in increasing order of
316    record number.  If RECORD is greater than the current number
317    of records per case, the number of records per case are
318    increased as needed.  */
319 void
320 data_parser_add_fixed_field (struct data_parser *parser,
321                              const struct fmt_spec *format, int case_idx,
322                              const char *name,
323                              int record, int first_column)
324 {
325   assert (parser->type == DP_FIXED);
326   assert (parser->n_fields == 0
327           || record >= parser->fields[parser->n_fields - 1].record);
328   if (record > parser->records_per_case)
329     parser->records_per_case = record;
330   add_field (parser, format, case_idx, name, record, first_column);
331 }
332
333 /* Returns true if any fields have been added to PARSER, false
334    otherwise. */
335 bool
336 data_parser_any_fields (const struct data_parser *parser)
337 {
338   return parser->n_fields > 0;
339 }
340
341 static void
342 set_any_sep (struct data_parser *parser)
343 {
344   ds_assign_substring (&parser->any_sep, parser->soft_seps);
345   ds_put_substring (&parser->any_sep, parser->hard_seps);
346 }
347 \f
348 static bool parse_delimited_span (const struct data_parser *,
349                                   struct dfm_reader *,
350                                   struct dictionary *, struct ccase *);
351 static bool parse_delimited_no_span (const struct data_parser *,
352                                      struct dfm_reader *,
353                                      struct dictionary *, struct ccase *);
354 static bool parse_fixed (const struct data_parser *, struct dfm_reader *,
355                          struct dictionary *, struct ccase *);
356
357 /* Reads a case from DFM into C, which matches dictionary DICT, parsing it with
358    PARSER.  Returns true if successful, false at end of file or on I/O error.
359
360    Case C must not be shared. */
361 bool
362 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
363                    struct dictionary *dict, struct ccase *c)
364 {
365   bool retval;
366
367   assert (!case_is_shared (c));
368   assert (data_parser_any_fields (parser));
369
370   /* Skip the requested number of records before reading the
371      first case. */
372   for (; parser->skip_records > 0; parser->skip_records--)
373     {
374       if (dfm_eof (reader))
375         return false;
376       dfm_forward_record (reader);
377     }
378
379   /* Limit cases. */
380   if (parser->type == DP_DELIMITED)
381     {
382       if (parser->span)
383         retval = parse_delimited_span (parser, reader, dict, c);
384       else
385         retval = parse_delimited_no_span (parser, reader, dict, c);
386     }
387   else
388     retval = parse_fixed (parser, reader, dict, c);
389
390   return retval;
391 }
392
393 static void
394 cut_field__ (const struct data_parser *parser, const struct substring *line,
395              struct substring *p, size_t *n_columns,
396              struct string *tmp, struct substring *field)
397 {
398   bool quoted = ss_find_byte (parser->quotes, ss_first (*p)) != SIZE_MAX;
399   if (quoted)
400     {
401       /* Quoted field. */
402       int quote = ss_get_byte (p);
403       if (!ss_get_until (p, quote, field))
404         msg (DW, _("Quoted string extends beyond end of line."));
405       if (parser->quote_escape && ss_first (*p) == quote)
406         {
407           ds_assign_substring (tmp, *field);
408           while (ss_match_byte (p, quote))
409             {
410               struct substring ss;
411               ds_put_byte (tmp, quote);
412               if (!ss_get_until (p, quote, &ss))
413                 msg (DW, _("Quoted string extends beyond end of line."));
414               ds_put_substring (tmp, ss);
415             }
416           *field = ds_ss (tmp);
417         }
418       *n_columns = ss_length (*line) - ss_length (*p);
419     }
420   else
421     {
422       /* Regular field. */
423       ss_get_bytes (p, ss_cspan (*p, ds_ss (&parser->any_sep)), field);
424       *n_columns = ss_length (*field);
425     }
426
427   /* Skip trailing soft separator and a single hard separator if present. */
428   size_t length_before_separators = ss_length (*p);
429   ss_ltrim (p, parser->soft_seps);
430   if (!ss_is_empty (*p)
431       && ss_find_byte (parser->hard_seps, ss_first (*p)) != SIZE_MAX)
432     {
433       ss_advance (p, 1);
434       ss_ltrim (p, parser->soft_seps);
435     }
436
437   if (!ss_is_empty (*p) && quoted && length_before_separators == ss_length (*p))
438     msg (DW, _("Missing delimiter following quoted string."));
439 }
440
441 /* Extracts a delimited field from the current position in the
442    current record according to PARSER, reading data from READER.
443
444    *FIELD is set to the field content.  The caller must not or
445    destroy this constant string.
446
447    Sets *FIRST_COLUMN to the 1-based column number of the start of
448    the extracted field, and *LAST_COLUMN to the end of the extracted
449    field.
450
451    Returns true on success, false on failure. */
452 static bool
453 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
454            int *first_column, int *last_column, struct string *tmp,
455            struct substring *field)
456 {
457   struct substring line, p;
458
459   if (dfm_eof (reader))
460     return false;
461   if (ss_is_empty (parser->hard_seps))
462     dfm_expand_tabs (reader);
463   line = p = dfm_get_record (reader);
464
465   /* Skip leading soft separators. */
466   ss_ltrim (&p, parser->soft_seps);
467
468   /* Handle empty or completely consumed lines. */
469   if (ss_is_empty (p))
470     {
471       if (!parser->empty_line_has_field || dfm_columns_past_end (reader) > 0)
472         return false;
473       else
474         {
475           *field = p;
476           *first_column = dfm_column_start (reader);
477           *last_column = *first_column + 1;
478           dfm_forward_columns (reader, 1);
479           return true;
480         }
481     }
482
483   size_t n_columns;
484   cut_field__ (parser, &line, &p, &n_columns, tmp, field);
485   *first_column = dfm_column_start (reader);
486   *last_column = *first_column + n_columns;
487
488   if (ss_is_empty (p))
489     dfm_forward_columns (reader, 1);
490   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
491
492   return true;
493 }
494
495 static void
496 parse_error (const struct dfm_reader *reader, const struct field *field,
497              int first_column, int last_column, char *error)
498 {
499   int line_number = dfm_get_line_number (reader);
500   struct msg_location *location = xmalloc (sizeof *location);
501   *location = (struct msg_location) {
502     .file_name = intern_new (dfm_get_file_name (reader)),
503     .start = { .line = line_number, .column = first_column },
504     .end = { .line = line_number, .column = last_column - 1 },
505   };
506   struct msg *m = xmalloc (sizeof *m);
507   *m = (struct msg) {
508     .category = MSG_C_DATA,
509     .severity = MSG_S_WARNING,
510     .location = location,
511     .text = xasprintf (_("Data for variable %s is not valid as format %s: %s"),
512                        field->name, fmt_name (field->format.type), error),
513   };
514   msg_emit (m);
515
516   free (error);
517 }
518
519 /* Reads a case from READER into C, which matches DICT, parsing it according to
520    fixed-format syntax rules in PARSER.  Returns true if successful, false at
521    end of file or on I/O error. */
522 static bool
523 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
524              struct dictionary *dict, struct ccase *c)
525 {
526   const char *input_encoding = dfm_reader_get_encoding (reader);
527   const char *output_encoding = dict_get_encoding (dict);
528   struct field *f;
529   int row;
530
531   if (dfm_eof (reader))
532     return false;
533
534   f = parser->fields;
535   for (row = 1; row <= parser->records_per_case; row++)
536     {
537       struct substring line;
538
539       if (dfm_eof (reader))
540         {
541           msg (DW, _("Partial case of %d of %d records discarded."),
542                row - 1, parser->records_per_case);
543           return false;
544         }
545       dfm_expand_tabs (reader);
546       line = dfm_get_record (reader);
547
548       for (; f < &parser->fields[parser->n_fields] && f->record == row; f++)
549         {
550           struct substring s = ss_substr (line, f->first_column - 1,
551                                           f->format.w);
552           union value *value = case_data_rw_idx (c, f->case_idx);
553           char *error = data_in (s, input_encoding, f->format.type,
554                                  settings_get_fmt_settings (),
555                                  value, fmt_var_width (&f->format),
556                                  output_encoding);
557
558           if (error == NULL)
559             data_in_imply_decimals (s, input_encoding, f->format.type,
560                                     f->format.d, settings_get_fmt_settings (),
561                                     value);
562           else
563             parse_error (reader, f, f->first_column,
564                          f->first_column + f->format.w, error);
565         }
566
567       dfm_forward_record (reader);
568     }
569
570   return true;
571 }
572
573 /* Splits the data line in LINE into individual text fields and returns the
574    number of fields.  If SA is nonnull, appends each field to SA; the caller
575    retains ownership of SA and its contents.  */
576 size_t
577 data_parser_split (const struct data_parser *parser,
578                    struct substring line, struct string_array *sa)
579 {
580   size_t n = 0;
581
582   struct string tmp = DS_EMPTY_INITIALIZER;
583   for (;;)
584     {
585       struct substring p = line;
586       ss_ltrim (&p, parser->soft_seps);
587       if (ss_is_empty (p))
588         {
589           ds_destroy (&tmp);
590           return n;
591         }
592
593       size_t n_columns;
594       struct substring field;
595
596       msg_disable ();
597       cut_field__ (parser, &line, &p, &n_columns, &tmp, &field);
598       msg_enable ();
599
600       if (sa)
601         string_array_append_nocopy (sa, ss_xstrdup (field));
602       n++;
603       line = p;
604     }
605 }
606
607 /* Reads a case from READER into C, which matches dictionary DICT, parsing it
608    according to free-format syntax rules in PARSER.  Returns true if
609    successful, false at end of file or on I/O error. */
610 static bool
611 parse_delimited_span (const struct data_parser *parser,
612                       struct dfm_reader *reader,
613                       struct dictionary *dict, struct ccase *c)
614 {
615   const char *output_encoding = dict_get_encoding (dict);
616   struct string tmp = DS_EMPTY_INITIALIZER;
617   struct field *f;
618
619   for (f = parser->fields; f < &parser->fields[parser->n_fields]; f++)
620     {
621       struct substring s;
622       int first_column, last_column;
623       char *error;
624
625       /* Cut out a field and read in a new record if necessary. */
626       while (!cut_field (parser, reader,
627                          &first_column, &last_column, &tmp, &s))
628         {
629           if (!dfm_eof (reader))
630             dfm_forward_record (reader);
631           if (dfm_eof (reader))
632             {
633               if (f > parser->fields)
634                 msg (DW, _("Partial case discarded.  The first variable "
635                            "missing was %s."), f->name);
636               ds_destroy (&tmp);
637               return false;
638             }
639         }
640
641       const char *input_encoding = dfm_reader_get_encoding (reader);
642       error = data_in (s, input_encoding, f->format.type,
643                        settings_get_fmt_settings (),
644                        case_data_rw_idx (c, f->case_idx),
645                        fmt_var_width (&f->format), output_encoding);
646       if (error != NULL)
647         parse_error (reader, f, first_column, last_column, error);
648     }
649   ds_destroy (&tmp);
650   return true;
651 }
652
653 /* Reads a case from READER into C, which matches dictionary DICT, parsing it
654    according to delimited syntax rules with one case per record in PARSER.
655    Returns true if successful, false at end of file or on I/O error. */
656 static bool
657 parse_delimited_no_span (const struct data_parser *parser,
658                          struct dfm_reader *reader,
659                          struct dictionary *dict, struct ccase *c)
660 {
661   const char *output_encoding = dict_get_encoding (dict);
662   struct string tmp = DS_EMPTY_INITIALIZER;
663   struct substring s;
664   struct field *f, *end;
665
666   if (dfm_eof (reader))
667     return false;
668
669   end = &parser->fields[parser->n_fields];
670   for (f = parser->fields; f < end; f++)
671     {
672       int first_column, last_column;
673       char *error;
674
675       if (!cut_field (parser, reader, &first_column, &last_column, &tmp, &s))
676         {
677           if (f < end - 1 && settings_get_undefined () && parser->warn_missing_fields)
678             msg (DW, _("Missing value(s) for all variables from %s onward.  "
679                        "These will be filled with the system-missing value "
680                        "or blanks, as appropriate."),
681                  f->name);
682           for (; f < end; f++)
683             value_set_missing (case_data_rw_idx (c, f->case_idx),
684                                fmt_var_width (&f->format));
685           goto exit;
686         }
687
688       const char *input_encoding = dfm_reader_get_encoding (reader);
689       error = data_in (s, input_encoding, f->format.type,
690                        settings_get_fmt_settings (),
691                        case_data_rw_idx (c, f->case_idx),
692                        fmt_var_width (&f->format), output_encoding);
693       if (error != NULL)
694         parse_error (reader, f, first_column, last_column, error);
695     }
696
697   s = dfm_get_record (reader);
698   ss_ltrim (&s, parser->soft_seps);
699   if (!ss_is_empty (s))
700     msg (DW, _("Record ends in data not part of any field."));
701
702 exit:
703   dfm_forward_record (reader);
704   ds_destroy (&tmp);
705   return true;
706 }
707 \f
708 /* Displays a table giving information on fixed-format variable
709    parsing on DATA LIST. */
710 static void
711 dump_fixed_table (const struct data_parser *parser,
712                   const struct file_handle *fh)
713 {
714   /* XXX This should not be preformatted. */
715   char *title = xasprintf (ngettext ("Reading %d record from %s.",
716                                      "Reading %d records from %s.",
717                                      parser->records_per_case),
718                            parser->records_per_case, fh_get_name (fh));
719   struct pivot_table *table = pivot_table_create__ (
720     pivot_value_new_user_text (title, -1), "Fixed Data Records");
721   free (title);
722
723   pivot_dimension_create (
724     table, PIVOT_AXIS_COLUMN, N_("Attributes"),
725     N_("Record"), N_("Columns"), N_("Format"));
726
727   struct pivot_dimension *variables = pivot_dimension_create (
728     table, PIVOT_AXIS_ROW, N_("Variable"));
729   variables->root->show_label = true;
730   for (size_t i = 0; i < parser->n_fields; i++)
731     {
732       struct field *f = &parser->fields[i];
733
734       /* XXX It would be better to have the actual variable here. */
735       int variable_idx = pivot_category_create_leaf (
736         variables->root, pivot_value_new_user_text (f->name, -1));
737
738       pivot_table_put2 (table, 0, variable_idx,
739                         pivot_value_new_integer (f->record));
740
741       int first_column = f->first_column;
742       int last_column = f->first_column + f->format.w - 1;
743       char *columns = xasprintf ("%d-%d", first_column, last_column);
744       pivot_table_put2 (table, 1, variable_idx,
745                         pivot_value_new_user_text (columns, -1));
746       free (columns);
747
748       char str[FMT_STRING_LEN_MAX + 1];
749       pivot_table_put2 (table, 2, variable_idx,
750                         pivot_value_new_user_text (
751                           fmt_to_string (&f->format, str), -1));
752
753     }
754
755   pivot_table_submit (table);
756 }
757
758 /* Displays a table giving information on free-format variable parsing
759    on DATA LIST. */
760 static void
761 dump_delimited_table (const struct data_parser *parser,
762                       const struct file_handle *fh)
763 {
764   struct pivot_table *table = pivot_table_create__ (
765     pivot_value_new_text_format (N_("Reading free-form data from %s."),
766                                  fh_get_name (fh)),
767     "Free-Form Data Records");
768
769   pivot_dimension_create (
770     table, PIVOT_AXIS_COLUMN, N_("Attributes"), N_("Format"));
771
772   struct pivot_dimension *variables = pivot_dimension_create (
773     table, PIVOT_AXIS_ROW, N_("Variable"));
774   variables->root->show_label = true;
775   for (size_t i = 0; i < parser->n_fields; i++)
776     {
777       struct field *f = &parser->fields[i];
778
779       /* XXX It would be better to have the actual variable here. */
780       int variable_idx = pivot_category_create_leaf (
781         variables->root, pivot_value_new_user_text (f->name, -1));
782
783       char str[FMT_STRING_LEN_MAX + 1];
784       pivot_table_put2 (table, 0, variable_idx,
785                         pivot_value_new_user_text (
786                           fmt_to_string (&f->format, str), -1));
787     }
788
789   pivot_table_submit (table);
790 }
791
792 /* Displays a table giving information on how PARSER will read
793    data from FH. */
794 void
795 data_parser_output_description (struct data_parser *parser,
796                                 const struct file_handle *fh)
797 {
798   if (parser->type == DP_FIXED)
799     dump_fixed_table (parser, fh);
800   else
801     dump_delimited_table (parser, fh);
802 }
803 \f
804 /* Data parser input program. */
805 struct data_parser_casereader
806   {
807     struct data_parser *parser; /* Parser. */
808     struct dictionary *dict;    /* Dictionary. */
809     struct dfm_reader *reader;  /* Data file reader. */
810     struct caseproto *proto;    /* Format of cases. */
811   };
812
813 static const struct casereader_class data_parser_casereader_class;
814
815 /* Replaces DS's active dataset by an input program that reads data
816    from READER according to the rules in PARSER, using DICT as
817    the underlying dictionary.  Ownership of PARSER and READER is
818    transferred to the input program, and ownership of DICT is
819    transferred to the dataset. */
820 void
821 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
822                                struct dfm_reader *reader,
823                               struct dictionary *dict,
824                                struct casereader* (*func)(struct casereader *,
825                                                           const struct dictionary *,
826                                                           void *),
827                                void *ud)
828 {
829   struct data_parser_casereader *r;
830   struct casereader *casereader0;
831   struct casereader *casereader1;
832
833   r = xmalloc (sizeof *r);
834   r->parser = parser;
835   r->dict = dict_ref (dict);
836   r->reader = reader;
837   r->proto = caseproto_ref (dict_get_proto (dict));
838   casereader0 = casereader_create_sequential (NULL, r->proto,
839                                              CASENUMBER_MAX,
840                                              &data_parser_casereader_class, r);
841
842   if (func)
843     casereader1 = func (casereader0, dict, ud);
844   else
845     casereader1 = casereader0;
846
847   dataset_set_dict (ds, dict);
848   dataset_set_source (ds, casereader1);
849 }
850
851
852 static struct ccase *
853 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
854 {
855   struct data_parser_casereader *r = r_;
856   struct ccase *c = case_create (r->proto);
857   if (data_parser_parse (r->parser, r->reader, r->dict, c))
858     return c;
859   else
860     {
861       case_unref (c);
862       return NULL;
863     }
864 }
865
866 static void
867 data_parser_casereader_destroy (struct casereader *reader, void *r_)
868 {
869   struct data_parser_casereader *r = r_;
870   if (dfm_reader_error (r->reader))
871     casereader_force_error (reader);
872   dfm_close_reader (r->reader);
873   caseproto_unref (r->proto);
874   dict_unref (r->dict);
875   data_parser_destroy (r->parser);
876   free (r);
877 }
878
879 static const struct casereader_class data_parser_casereader_class =
880   {
881     data_parser_casereader_read,
882     data_parser_casereader_destroy,
883     NULL,
884     NULL,
885   };