message: Break message location out into a separate struct.
[pspp] / src / language / data-io / data-parser.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013, 2016 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/data-io/data-parser.h"
20
21 #include <stdint.h>
22 #include <stdlib.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/data-in.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/file-handle-def.h"
30 #include "data/settings.h"
31 #include "language/data-io/data-reader.h"
32 #include "libpspp/message.h"
33 #include "libpspp/str.h"
34 #include "output/pivot-table.h"
35
36 #include "gl/xalloc.h"
37
38 #include "gettext.h"
39 #define N_(msgid) msgid
40 #define _(msgid) gettext (msgid)
41
42 /* Data parser for textual data like that read by DATA LIST. */
43 struct data_parser
44   {
45     struct dictionary *dict;    /* Dictionary of destination */
46     enum data_parser_type type; /* Type of data to parse. */
47     int skip_records;           /* Records to skip before first real data. */
48
49     struct field *fields;       /* Fields to parse. */
50     size_t field_cnt;           /* Number of fields. */
51     size_t field_allocated;     /* Number of fields spaced allocated for. */
52
53     /* DP_DELIMITED parsers only. */
54     bool span;                  /* May cases span multiple records? */
55     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
56     bool warn_missing_fields;   /* Should missing fields be considered errors? */
57     struct substring quotes;    /* Characters that can quote separators. */
58     bool quote_escape;          /* Doubled quote acts as escape? */
59     struct substring soft_seps; /* Two soft separators act like just one. */
60     struct substring hard_seps; /* Two hard separators yield empty fields. */
61     struct string any_sep;      /* Concatenation of soft_seps and hard_seps. */
62
63     /* DP_FIXED parsers only. */
64     int records_per_case;       /* Number of records in each case. */
65   };
66
67 /* How to parse one variable. */
68 struct field
69   {
70     struct fmt_spec format;     /* Input format of this field. */
71     int case_idx;               /* First value in case. */
72     char *name;                 /* Var name for error messages and tables. */
73
74     /* DP_FIXED only. */
75     int record;                 /* Record number (1-based). */
76     int first_column;           /* First column in record (1-based). */
77   };
78
79 static void set_any_sep (struct data_parser *parser);
80
81 /* Creates and returns a new data parser. */
82 struct data_parser *
83 data_parser_create (struct dictionary *dict)
84 {
85   struct data_parser *parser = xmalloc (sizeof *parser);
86
87   parser->type = DP_FIXED;
88   parser->skip_records = 0;
89
90   parser->fields = NULL;
91   parser->field_cnt = 0;
92   parser->field_allocated = 0;
93   parser->dict = dict_ref (dict);
94
95   parser->span = true;
96   parser->empty_line_has_field = false;
97   parser->warn_missing_fields = true;
98   ss_alloc_substring (&parser->quotes, ss_cstr ("\"'"));
99   parser->quote_escape = false;
100   ss_alloc_substring (&parser->soft_seps, ss_cstr (CC_SPACES));
101   ss_alloc_substring (&parser->hard_seps, ss_cstr (","));
102   ds_init_empty (&parser->any_sep);
103   set_any_sep (parser);
104
105   parser->records_per_case = 0;
106
107   return parser;
108 }
109
110 /* Destroys PARSER. */
111 void
112 data_parser_destroy (struct data_parser *parser)
113 {
114   if (parser != NULL)
115     {
116       size_t i;
117
118       dict_unref (parser->dict);
119       for (i = 0; i < parser->field_cnt; i++)
120         free (parser->fields[i].name);
121       free (parser->fields);
122       ss_dealloc (&parser->quotes);
123       ss_dealloc (&parser->soft_seps);
124       ss_dealloc (&parser->hard_seps);
125       ds_destroy (&parser->any_sep);
126       free (parser);
127     }
128 }
129
130 /* Returns the type of PARSER (either DP_DELIMITED or DP_FIXED). */
131 enum data_parser_type
132 data_parser_get_type (const struct data_parser *parser)
133 {
134   return parser->type;
135 }
136
137 /* Sets the type of PARSER to TYPE (either DP_DELIMITED or
138    DP_FIXED). */
139 void
140 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
141 {
142   assert (parser->field_cnt == 0);
143   assert (type == DP_FIXED || type == DP_DELIMITED);
144   parser->type = type;
145 }
146
147 /* Configures PARSER to skip the specified number of
148    INITIAL_RECORDS_TO_SKIP before parsing any data.  By default,
149    no records are skipped. */
150 void
151 data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
152 {
153   assert (initial_records_to_skip >= 0);
154   parser->skip_records = initial_records_to_skip;
155 }
156
157 /* Returns true if PARSER is configured to allow cases to span
158    multiple records. */
159 bool
160 data_parser_get_span (const struct data_parser *parser)
161 {
162   return parser->span;
163 }
164
165 /* If MAY_CASES_SPAN_RECORDS is true, configures PARSER to allow
166    a single case to span multiple records and multiple cases to
167    occupy a single record.  If MAY_CASES_SPAN_RECORDS is false,
168    configures PARSER to require each record to contain exactly
169    one case.
170
171    This setting affects parsing of DP_DELIMITED files only. */
172 void
173 data_parser_set_span (struct data_parser *parser, bool may_cases_span_records)
174 {
175   parser->span = may_cases_span_records;
176 }
177
178 /* If EMPTY_LINE_HAS_FIELD is true, configures PARSER to parse an
179    empty line as an empty field and to treat a hard delimiter
180    followed by end-of-line as an empty field.  If
181    EMPTY_LINE_HAS_FIELD is false, PARSER will skip empty lines
182    and hard delimiters at the end of lines without emitting empty
183    fields.
184
185    This setting affects parsing of DP_DELIMITED files only. */
186 void
187 data_parser_set_empty_line_has_field (struct data_parser *parser,
188                                       bool empty_line_has_field)
189 {
190   parser->empty_line_has_field = empty_line_has_field;
191 }
192
193
194 /* If WARN_MISSING_FIELDS is true, configures PARSER to emit a warning
195    and cause an error condition when a missing field is encountered.
196    If  WARN_MISSING_FIELDS is false, PARSER will silently fill such
197    fields with the system missing value.
198
199    This setting affects parsing of DP_DELIMITED files only. */
200 void
201 data_parser_set_warn_missing_fields (struct data_parser *parser,
202                                      bool warn_missing_fields)
203 {
204   parser->warn_missing_fields = warn_missing_fields;
205 }
206
207
208 /* Sets the characters that may be used for quoting field
209    contents to QUOTES.  If QUOTES is empty, quoting will be
210    disabled.
211
212    The caller retains ownership of QUOTES.
213
214    This setting affects parsing of DP_DELIMITED files only. */
215 void
216 data_parser_set_quotes (struct data_parser *parser, struct substring quotes)
217 {
218   ss_dealloc (&parser->quotes);
219   ss_alloc_substring (&parser->quotes, quotes);
220 }
221
222 /* If ESCAPE is false (the default setting), a character used for
223    quoting cannot itself be embedded within a quoted field.  If
224    ESCAPE is true, then a quote character can be embedded within
225    a quoted field by doubling it.
226
227    This setting affects parsing of DP_DELIMITED files only, and
228    only when at least one quote character has been set (with
229    data_parser_set_quotes). */
230 void
231 data_parser_set_quote_escape (struct data_parser *parser, bool escape)
232 {
233   parser->quote_escape = escape;
234 }
235
236 /* Sets PARSER's soft delimiters to DELIMITERS.  Soft delimiters
237    separate fields, but consecutive soft delimiters do not yield
238    empty fields.  (Ordinarily, only white space characters are
239    appropriate soft delimiters.)
240
241    The caller retains ownership of DELIMITERS.
242
243    This setting affects parsing of DP_DELIMITED files only. */
244 void
245 data_parser_set_soft_delimiters (struct data_parser *parser,
246                                  struct substring delimiters)
247 {
248   ss_dealloc (&parser->soft_seps);
249   ss_alloc_substring (&parser->soft_seps, delimiters);
250   set_any_sep (parser);
251 }
252
253 /* Sets PARSER's hard delimiters to DELIMITERS.  Hard delimiters
254    separate fields.  A consecutive pair of hard delimiters yield
255    an empty field.
256
257    The caller retains ownership of DELIMITERS.
258
259    This setting affects parsing of DP_DELIMITED files only. */
260 void
261 data_parser_set_hard_delimiters (struct data_parser *parser,
262                                  struct substring delimiters)
263 {
264   ss_dealloc (&parser->hard_seps);
265   ss_alloc_substring (&parser->hard_seps, delimiters);
266   set_any_sep (parser);
267 }
268
269 /* Returns the number of records per case. */
270 int
271 data_parser_get_records (const struct data_parser *parser)
272 {
273   return parser->records_per_case;
274 }
275
276 /* Sets the number of records per case to RECORDS_PER_CASE.
277
278    This setting affects parsing of DP_FIXED files only. */
279 void
280 data_parser_set_records (struct data_parser *parser, int records_per_case)
281 {
282   assert (records_per_case >= 0);
283   assert (records_per_case >= parser->records_per_case);
284   parser->records_per_case = records_per_case;
285 }
286
287 static void
288 add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
289            const char *name, int record, int first_column)
290 {
291   struct field *field;
292
293   if (p->field_cnt == p->field_allocated)
294     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
295   field = &p->fields[p->field_cnt++];
296   field->format = *format;
297   field->case_idx = case_idx;
298   field->name = xstrdup (name);
299   field->record = record;
300   field->first_column = first_column;
301 }
302
303 /* Adds a delimited field to the field parsed by PARSER, which
304    must be configured as a DP_DELIMITED parser.  The field is
305    parsed as input format FORMAT.  Its data will be stored into case
306    index CASE_INDEX.  Errors in input data will be reported
307    against variable NAME. */
308 void
309 data_parser_add_delimited_field (struct data_parser *parser,
310                                  const struct fmt_spec *format, int case_idx,
311                                  const char *name)
312 {
313   assert (parser->type == DP_DELIMITED);
314   add_field (parser, format, case_idx, name, 0, 0);
315 }
316
317 /* Adds a fixed field to the field parsed by PARSER, which
318    must be configured as a DP_FIXED parser.  The field is
319    parsed as input format FORMAT.  Its data will be stored into case
320    index CASE_INDEX.  Errors in input data will be reported
321    against variable NAME.  The field will be drawn from the
322    FORMAT->w columns in 1-based RECORD starting at 1-based
323    column FIRST_COLUMN.
324
325    RECORD must be at least as great as that of any field already
326    added; that is, fields must be added in increasing order of
327    record number.  If RECORD is greater than the current number
328    of records per case, the number of records per case are
329    increased as needed.  */
330 void
331 data_parser_add_fixed_field (struct data_parser *parser,
332                              const struct fmt_spec *format, int case_idx,
333                              const char *name,
334                              int record, int first_column)
335 {
336   assert (parser->type == DP_FIXED);
337   assert (parser->field_cnt == 0
338           || record >= parser->fields[parser->field_cnt - 1].record);
339   if (record > parser->records_per_case)
340     parser->records_per_case = record;
341   add_field (parser, format, case_idx, name, record, first_column);
342 }
343
344 /* Returns true if any fields have been added to PARSER, false
345    otherwise. */
346 bool
347 data_parser_any_fields (const struct data_parser *parser)
348 {
349   return parser->field_cnt > 0;
350 }
351
352 static void
353 set_any_sep (struct data_parser *parser)
354 {
355   ds_assign_substring (&parser->any_sep, parser->soft_seps);
356   ds_put_substring (&parser->any_sep, parser->hard_seps);
357 }
358 \f
359 static bool parse_delimited_span (const struct data_parser *,
360                                   struct dfm_reader *, struct ccase *);
361 static bool parse_delimited_no_span (const struct data_parser *,
362                                      struct dfm_reader *, struct ccase *);
363 static bool parse_fixed (const struct data_parser *,
364                          struct dfm_reader *, struct ccase *);
365
366 /* Reads a case from DFM into C, parsing it with PARSER.  Returns
367    true if successful, false at end of file or on I/O error.
368
369    Case C must not be shared. */
370 bool
371 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
372                    struct ccase *c)
373 {
374   bool retval;
375
376   assert (!case_is_shared (c));
377   assert (data_parser_any_fields (parser));
378
379   /* Skip the requested number of records before reading the
380      first case. */
381   for (; parser->skip_records > 0; parser->skip_records--)
382     {
383       if (dfm_eof (reader))
384         return false;
385       dfm_forward_record (reader);
386     }
387
388   /* Limit cases. */
389   if (parser->type == DP_DELIMITED)
390     {
391       if (parser->span)
392         retval = parse_delimited_span (parser, reader, c);
393       else
394         retval = parse_delimited_no_span (parser, reader, c);
395     }
396   else
397     retval = parse_fixed (parser, reader, c);
398
399   return retval;
400 }
401
402 /* Extracts a delimited field from the current position in the
403    current record according to PARSER, reading data from READER.
404
405    *FIELD is set to the field content.  The caller must not or
406    destroy this constant string.
407
408    Sets *FIRST_COLUMN to the 1-based column number of the start of
409    the extracted field, and *LAST_COLUMN to the end of the extracted
410    field.
411
412    Returns true on success, false on failure. */
413 static bool
414 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
415            int *first_column, int *last_column, struct string *tmp,
416            struct substring *field)
417 {
418   size_t length_before_separators;
419   struct substring line, p;
420   bool quoted;
421
422   if (dfm_eof (reader))
423     return false;
424   if (ss_is_empty (parser->hard_seps))
425     dfm_expand_tabs (reader);
426   line = p = dfm_get_record (reader);
427
428   /* Skip leading soft separators. */
429   ss_ltrim (&p, parser->soft_seps);
430
431   /* Handle empty or completely consumed lines. */
432   if (ss_is_empty (p))
433     {
434       if (!parser->empty_line_has_field || dfm_columns_past_end (reader) > 0)
435         return false;
436       else
437         {
438           *field = p;
439           *first_column = dfm_column_start (reader);
440           *last_column = *first_column + 1;
441           dfm_forward_columns (reader, 1);
442           return true;
443         }
444     }
445
446   *first_column = dfm_column_start (reader);
447   quoted = ss_find_byte (parser->quotes, ss_first (p)) != SIZE_MAX;
448   if (quoted)
449     {
450       /* Quoted field. */
451       int quote = ss_get_byte (&p);
452       if (!ss_get_until (&p, quote, field))
453         msg (DW, _("Quoted string extends beyond end of line."));
454       if (parser->quote_escape && ss_first (p) == quote)
455         {
456           ds_assign_substring (tmp, *field);
457           while (ss_match_byte (&p, quote))
458             {
459               struct substring ss;
460               ds_put_byte (tmp, quote);
461               if (!ss_get_until (&p, quote, &ss))
462                 msg (DW, _("Quoted string extends beyond end of line."));
463               ds_put_substring (tmp, ss);
464             }
465           *field = ds_ss (tmp);
466         }
467       *last_column = *first_column + (ss_length (line) - ss_length (p));
468     }
469   else
470     {
471       /* Regular field. */
472       ss_get_bytes (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
473       *last_column = *first_column + ss_length (*field);
474     }
475
476   /* Skip trailing soft separator and a single hard separator if present. */
477   length_before_separators = ss_length (p);
478   ss_ltrim (&p, parser->soft_seps);
479   if (!ss_is_empty (p)
480       && ss_find_byte (parser->hard_seps, ss_first (p)) != SIZE_MAX)
481     {
482       ss_advance (&p, 1);
483       ss_ltrim (&p, parser->soft_seps);
484     }
485   if (ss_is_empty (p))
486     dfm_forward_columns (reader, 1);
487   else if (quoted && length_before_separators == ss_length (p))
488     msg (DW, _("Missing delimiter following quoted string."));
489   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
490
491   return true;
492 }
493
494 static void
495 parse_error (const struct dfm_reader *reader, const struct field *field,
496              int first_column, int last_column, char *error)
497 {
498   int line_number = dfm_get_line_number (reader);
499   const struct msg_location location = {
500     .file_name = CONST_CAST (char *, dfm_get_file_name (reader)),
501     .first_line = line_number,
502     .last_line = line_number + 1,
503     .first_column = first_column,
504     .last_column = last_column,
505   };
506   struct msg m = {
507     .category = MSG_C_DATA,
508     .severity = MSG_S_WARNING,
509     .location = CONST_CAST (struct msg_location *, &location),
510     .text = xasprintf (_("Data for variable %s is not valid as format %s: %s"),
511                        field->name, fmt_name (field->format.type), error),
512   };
513   msg_emit (&m);
514
515   free (error);
516 }
517
518 /* Reads a case from READER into C, parsing it according to
519    fixed-format syntax rules in PARSER.
520    Returns true if successful, false at end of file or on I/O error. */
521 static bool
522 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
523              struct ccase *c)
524 {
525   const char *input_encoding = dfm_reader_get_encoding (reader);
526   const char *output_encoding = dict_get_encoding (parser->dict);
527   struct field *f;
528   int row;
529
530   if (dfm_eof (reader))
531     return false;
532
533   f = parser->fields;
534   for (row = 1; row <= parser->records_per_case; row++)
535     {
536       struct substring line;
537
538       if (dfm_eof (reader))
539         {
540           msg (DW, _("Partial case of %d of %d records discarded."),
541                row - 1, parser->records_per_case);
542           return false;
543         }
544       dfm_expand_tabs (reader);
545       line = dfm_get_record (reader);
546
547       for (; f < &parser->fields[parser->field_cnt] && f->record == row; f++)
548         {
549           struct substring s = ss_substr (line, f->first_column - 1,
550                                           f->format.w);
551           union value *value = case_data_rw_idx (c, f->case_idx);
552           char *error = data_in (s, input_encoding, f->format.type,
553                                  settings_get_fmt_settings (),
554                                  value, fmt_var_width (&f->format),
555                                  output_encoding);
556
557           if (error == NULL)
558             data_in_imply_decimals (s, input_encoding, f->format.type,
559                                     f->format.d, settings_get_fmt_settings (),
560                                     value);
561           else
562             parse_error (reader, f, f->first_column,
563                          f->first_column + f->format.w, error);
564         }
565
566       dfm_forward_record (reader);
567     }
568
569   return true;
570 }
571
572 /* Reads a case from READER into C, parsing it according to
573    free-format syntax rules in PARSER.
574    Returns true if successful, false at end of file or on I/O error. */
575 static bool
576 parse_delimited_span (const struct data_parser *parser,
577                       struct dfm_reader *reader, struct ccase *c)
578 {
579   const char *output_encoding = dict_get_encoding (parser->dict);
580   struct string tmp = DS_EMPTY_INITIALIZER;
581   struct field *f;
582
583   for (f = parser->fields; f < &parser->fields[parser->field_cnt]; f++)
584     {
585       struct substring s;
586       int first_column, last_column;
587       char *error;
588
589       /* Cut out a field and read in a new record if necessary. */
590       while (!cut_field (parser, reader,
591                          &first_column, &last_column, &tmp, &s))
592         {
593           if (!dfm_eof (reader))
594             dfm_forward_record (reader);
595           if (dfm_eof (reader))
596             {
597               if (f > parser->fields)
598                 msg (DW, _("Partial case discarded.  The first variable "
599                            "missing was %s."), f->name);
600               ds_destroy (&tmp);
601               return false;
602             }
603         }
604
605       const char *input_encoding = dfm_reader_get_encoding (reader);
606       error = data_in (s, input_encoding, f->format.type,
607                        settings_get_fmt_settings (),
608                        case_data_rw_idx (c, f->case_idx),
609                        fmt_var_width (&f->format), output_encoding);
610       if (error != NULL)
611         parse_error (reader, f, first_column, last_column, error);
612     }
613   ds_destroy (&tmp);
614   return true;
615 }
616
617 /* Reads a case from READER into C, parsing it according to
618    delimited syntax rules with one case per record in PARSER.
619    Returns true if successful, false at end of file or on I/O error. */
620 static bool
621 parse_delimited_no_span (const struct data_parser *parser,
622                          struct dfm_reader *reader, struct ccase *c)
623 {
624   const char *output_encoding = dict_get_encoding (parser->dict);
625   struct string tmp = DS_EMPTY_INITIALIZER;
626   struct substring s;
627   struct field *f, *end;
628
629   if (dfm_eof (reader))
630     return false;
631
632   end = &parser->fields[parser->field_cnt];
633   for (f = parser->fields; f < end; f++)
634     {
635       int first_column, last_column;
636       char *error;
637
638       if (!cut_field (parser, reader, &first_column, &last_column, &tmp, &s))
639         {
640           if (f < end - 1 && settings_get_undefined () && parser->warn_missing_fields)
641             msg (DW, _("Missing value(s) for all variables from %s onward.  "
642                        "These will be filled with the system-missing value "
643                        "or blanks, as appropriate."),
644                  f->name);
645           for (; f < end; f++)
646             value_set_missing (case_data_rw_idx (c, f->case_idx),
647                                fmt_var_width (&f->format));
648           goto exit;
649         }
650
651       const char *input_encoding = dfm_reader_get_encoding (reader);
652       error = data_in (s, input_encoding, f->format.type,
653                        settings_get_fmt_settings (),
654                        case_data_rw_idx (c, f->case_idx),
655                        fmt_var_width (&f->format), output_encoding);
656       if (error != NULL)
657         parse_error (reader, f, first_column, last_column, error);
658     }
659
660   s = dfm_get_record (reader);
661   ss_ltrim (&s, parser->soft_seps);
662   if (!ss_is_empty (s))
663     msg (DW, _("Record ends in data not part of any field."));
664
665 exit:
666   dfm_forward_record (reader);
667   ds_destroy (&tmp);
668   return true;
669 }
670 \f
671 /* Displays a table giving information on fixed-format variable
672    parsing on DATA LIST. */
673 static void
674 dump_fixed_table (const struct data_parser *parser,
675                   const struct file_handle *fh)
676 {
677   /* XXX This should not be preformatted. */
678   char *title = xasprintf (ngettext ("Reading %d record from %s.",
679                                      "Reading %d records from %s.",
680                                      parser->records_per_case),
681                            parser->records_per_case, fh_get_name (fh));
682   struct pivot_table *table = pivot_table_create__ (
683     pivot_value_new_user_text (title, -1), "Fixed Data Records");
684   free (title);
685
686   pivot_dimension_create (
687     table, PIVOT_AXIS_COLUMN, N_("Attributes"),
688     N_("Record"), N_("Columns"), N_("Format"));
689
690   struct pivot_dimension *variables = pivot_dimension_create (
691     table, PIVOT_AXIS_ROW, N_("Variable"));
692   variables->root->show_label = true;
693   for (size_t i = 0; i < parser->field_cnt; i++)
694     {
695       struct field *f = &parser->fields[i];
696
697       /* XXX It would be better to have the actual variable here. */
698       int variable_idx = pivot_category_create_leaf (
699         variables->root, pivot_value_new_user_text (f->name, -1));
700
701       pivot_table_put2 (table, 0, variable_idx,
702                         pivot_value_new_integer (f->record));
703
704       int first_column = f->first_column;
705       int last_column = f->first_column + f->format.w - 1;
706       char *columns = xasprintf ("%d-%d", first_column, last_column);
707       pivot_table_put2 (table, 1, variable_idx,
708                         pivot_value_new_user_text (columns, -1));
709       free (columns);
710
711       char str[FMT_STRING_LEN_MAX + 1];
712       pivot_table_put2 (table, 2, variable_idx,
713                         pivot_value_new_user_text (
714                           fmt_to_string (&f->format, str), -1));
715
716     }
717
718   pivot_table_submit (table);
719 }
720
721 /* Displays a table giving information on free-format variable parsing
722    on DATA LIST. */
723 static void
724 dump_delimited_table (const struct data_parser *parser,
725                       const struct file_handle *fh)
726 {
727   struct pivot_table *table = pivot_table_create__ (
728     pivot_value_new_text_format (N_("Reading free-form data from %s."),
729                                  fh_get_name (fh)),
730     "Free-Form Data Records");
731
732   pivot_dimension_create (
733     table, PIVOT_AXIS_COLUMN, N_("Attributes"), N_("Format"));
734
735   struct pivot_dimension *variables = pivot_dimension_create (
736     table, PIVOT_AXIS_ROW, N_("Variable"));
737   variables->root->show_label = true;
738   for (size_t i = 0; i < parser->field_cnt; i++)
739     {
740       struct field *f = &parser->fields[i];
741
742       /* XXX It would be better to have the actual variable here. */
743       int variable_idx = pivot_category_create_leaf (
744         variables->root, pivot_value_new_user_text (f->name, -1));
745
746       char str[FMT_STRING_LEN_MAX + 1];
747       pivot_table_put2 (table, 0, variable_idx,
748                         pivot_value_new_user_text (
749                           fmt_to_string (&f->format, str), -1));
750     }
751
752   pivot_table_submit (table);
753 }
754
755 /* Displays a table giving information on how PARSER will read
756    data from FH. */
757 void
758 data_parser_output_description (struct data_parser *parser,
759                                 const struct file_handle *fh)
760 {
761   if (parser->type == DP_FIXED)
762     dump_fixed_table (parser, fh);
763   else
764     dump_delimited_table (parser, fh);
765 }
766 \f
767 /* Data parser input program. */
768 struct data_parser_casereader
769   {
770     struct data_parser *parser; /* Parser. */
771     struct dfm_reader *reader;  /* Data file reader. */
772     struct caseproto *proto;    /* Format of cases. */
773   };
774
775 static const struct casereader_class data_parser_casereader_class;
776
777 /* Replaces DS's active dataset by an input program that reads data
778    from READER according to the rules in PARSER, using DICT as
779    the underlying dictionary.  Ownership of PARSER and READER is
780    transferred to the input program, and ownership of DICT is
781    transferred to the dataset. */
782 void
783 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
784                                struct dfm_reader *reader,
785                                struct dictionary *dict,
786                                struct casereader* (*func)(struct casereader *,
787                                                           const struct dictionary *,
788                                                           void *),
789                                void *ud)
790 {
791   struct data_parser_casereader *r;
792   struct casereader *casereader0;
793   struct casereader *casereader1;
794
795   r = xmalloc (sizeof *r);
796   r->parser = parser;
797   r->reader = reader;
798   r->proto = caseproto_ref (dict_get_proto (dict));
799   casereader0 = casereader_create_sequential (NULL, r->proto,
800                                              CASENUMBER_MAX,
801                                              &data_parser_casereader_class, r);
802
803   if (func)
804     casereader1 = func (casereader0, dict, ud);
805   else
806     casereader1 = casereader0;
807
808   dataset_set_dict (ds, dict);
809   dataset_set_source (ds, casereader1);
810 }
811
812
813 static struct ccase *
814 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
815 {
816   struct data_parser_casereader *r = r_;
817   struct ccase *c = case_create (r->proto);
818   if (data_parser_parse (r->parser, r->reader, c))
819     return c;
820   else
821     {
822       case_unref (c);
823       return NULL;
824     }
825 }
826
827 static void
828 data_parser_casereader_destroy (struct casereader *reader, void *r_)
829 {
830   struct data_parser_casereader *r = r_;
831   if (dfm_reader_error (r->reader))
832     casereader_force_error (reader);
833   dfm_close_reader (r->reader);
834   caseproto_unref (r->proto);
835   data_parser_destroy (r->parser);
836   free (r);
837 }
838
839 static const struct casereader_class data_parser_casereader_class =
840   {
841     data_parser_casereader_read,
842     data_parser_casereader_destroy,
843     NULL,
844     NULL,
845   };