818c15a30f2ab09115b25b5ff7e7dc1f711eb738
[pspp] / src / language / data-io / data-parser.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013, 2016 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/data-io/data-parser.h"
20
21 #include <stdint.h>
22 #include <stdlib.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/data-in.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/file-handle-def.h"
30 #include "data/settings.h"
31 #include "language/data-io/data-reader.h"
32 #include "libpspp/message.h"
33 #include "libpspp/str.h"
34 #include "output/pivot-table.h"
35
36 #include "gl/xalloc.h"
37
38 #include "gettext.h"
39 #define N_(msgid) msgid
40 #define _(msgid) gettext (msgid)
41
42 /* Data parser for textual data like that read by DATA LIST. */
43 struct data_parser
44   {
45     struct dictionary *dict;    /* Dictionary of destination */
46     enum data_parser_type type; /* Type of data to parse. */
47     int skip_records;           /* Records to skip before first real data. */
48
49     struct field *fields;       /* Fields to parse. */
50     size_t field_cnt;           /* Number of fields. */
51     size_t field_allocated;     /* Number of fields spaced allocated for. */
52
53     /* DP_DELIMITED parsers only. */
54     bool span;                  /* May cases span multiple records? */
55     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
56     bool warn_missing_fields;   /* Should missing fields be considered errors? */
57     struct substring quotes;    /* Characters that can quote separators. */
58     bool quote_escape;          /* Doubled quote acts as escape? */
59     struct substring soft_seps; /* Two soft separators act like just one. */
60     struct substring hard_seps; /* Two hard separators yield empty fields. */
61     struct string any_sep;      /* Concatenation of soft_seps and hard_seps. */
62
63     /* DP_FIXED parsers only. */
64     int records_per_case;       /* Number of records in each case. */
65   };
66
67 /* How to parse one variable. */
68 struct field
69   {
70     struct fmt_spec format;     /* Input format of this field. */
71     int case_idx;               /* First value in case. */
72     char *name;                 /* Var name for error messages and tables. */
73
74     /* DP_FIXED only. */
75     int record;                 /* Record number (1-based). */
76     int first_column;           /* First column in record (1-based). */
77   };
78
79 static void set_any_sep (struct data_parser *parser);
80
81 /* Creates and returns a new data parser. */
82 struct data_parser *
83 data_parser_create (struct dictionary *dict)
84 {
85   struct data_parser *parser = xmalloc (sizeof *parser);
86
87   parser->type = DP_FIXED;
88   parser->skip_records = 0;
89
90   parser->fields = NULL;
91   parser->field_cnt = 0;
92   parser->field_allocated = 0;
93   parser->dict = dict_ref (dict);
94
95   parser->span = true;
96   parser->empty_line_has_field = false;
97   parser->warn_missing_fields = true;
98   ss_alloc_substring (&parser->quotes, ss_cstr ("\"'"));
99   parser->quote_escape = false;
100   ss_alloc_substring (&parser->soft_seps, ss_cstr (CC_SPACES));
101   ss_alloc_substring (&parser->hard_seps, ss_cstr (","));
102   ds_init_empty (&parser->any_sep);
103   set_any_sep (parser);
104
105   parser->records_per_case = 0;
106
107   return parser;
108 }
109
110 /* Destroys PARSER. */
111 void
112 data_parser_destroy (struct data_parser *parser)
113 {
114   if (parser != NULL)
115     {
116       size_t i;
117
118       dict_unref (parser->dict);
119       for (i = 0; i < parser->field_cnt; i++)
120         free (parser->fields[i].name);
121       free (parser->fields);
122       ss_dealloc (&parser->quotes);
123       ss_dealloc (&parser->soft_seps);
124       ss_dealloc (&parser->hard_seps);
125       ds_destroy (&parser->any_sep);
126       free (parser);
127     }
128 }
129
130 /* Returns the type of PARSER (either DP_DELIMITED or DP_FIXED). */
131 enum data_parser_type
132 data_parser_get_type (const struct data_parser *parser)
133 {
134   return parser->type;
135 }
136
137 /* Sets the type of PARSER to TYPE (either DP_DELIMITED or
138    DP_FIXED). */
139 void
140 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
141 {
142   assert (parser->field_cnt == 0);
143   assert (type == DP_FIXED || type == DP_DELIMITED);
144   parser->type = type;
145 }
146
147 /* Configures PARSER to skip the specified number of
148    INITIAL_RECORDS_TO_SKIP before parsing any data.  By default,
149    no records are skipped. */
150 void
151 data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
152 {
153   assert (initial_records_to_skip >= 0);
154   parser->skip_records = initial_records_to_skip;
155 }
156
157 /* Returns true if PARSER is configured to allow cases to span
158    multiple records. */
159 bool
160 data_parser_get_span (const struct data_parser *parser)
161 {
162   return parser->span;
163 }
164
165 /* If MAY_CASES_SPAN_RECORDS is true, configures PARSER to allow
166    a single case to span multiple records and multiple cases to
167    occupy a single record.  If MAY_CASES_SPAN_RECORDS is false,
168    configures PARSER to require each record to contain exactly
169    one case.
170
171    This setting affects parsing of DP_DELIMITED files only. */
172 void
173 data_parser_set_span (struct data_parser *parser, bool may_cases_span_records)
174 {
175   parser->span = may_cases_span_records;
176 }
177
178 /* If EMPTY_LINE_HAS_FIELD is true, configures PARSER to parse an
179    empty line as an empty field and to treat a hard delimiter
180    followed by end-of-line as an empty field.  If
181    EMPTY_LINE_HAS_FIELD is false, PARSER will skip empty lines
182    and hard delimiters at the end of lines without emitting empty
183    fields.
184
185    This setting affects parsing of DP_DELIMITED files only. */
186 void
187 data_parser_set_empty_line_has_field (struct data_parser *parser,
188                                       bool empty_line_has_field)
189 {
190   parser->empty_line_has_field = empty_line_has_field;
191 }
192
193
194 /* If WARN_MISSING_FIELDS is true, configures PARSER to emit a warning
195    and cause an error condition when a missing field is encountered.
196    If  WARN_MISSING_FIELDS is false, PARSER will silently fill such
197    fields with the system missing value.
198
199    This setting affects parsing of DP_DELIMITED files only. */
200 void
201 data_parser_set_warn_missing_fields (struct data_parser *parser,
202                                      bool warn_missing_fields)
203 {
204   parser->warn_missing_fields = warn_missing_fields;
205 }
206
207
208 /* Sets the characters that may be used for quoting field
209    contents to QUOTES.  If QUOTES is empty, quoting will be
210    disabled.
211
212    The caller retains ownership of QUOTES.
213
214    This setting affects parsing of DP_DELIMITED files only. */
215 void
216 data_parser_set_quotes (struct data_parser *parser, struct substring quotes)
217 {
218   ss_dealloc (&parser->quotes);
219   ss_alloc_substring (&parser->quotes, quotes);
220 }
221
222 /* If ESCAPE is false (the default setting), a character used for
223    quoting cannot itself be embedded within a quoted field.  If
224    ESCAPE is true, then a quote character can be embedded within
225    a quoted field by doubling it.
226
227    This setting affects parsing of DP_DELIMITED files only, and
228    only when at least one quote character has been set (with
229    data_parser_set_quotes). */
230 void
231 data_parser_set_quote_escape (struct data_parser *parser, bool escape)
232 {
233   parser->quote_escape = escape;
234 }
235
236 /* Sets PARSER's soft delimiters to DELIMITERS.  Soft delimiters
237    separate fields, but consecutive soft delimiters do not yield
238    empty fields.  (Ordinarily, only white space characters are
239    appropriate soft delimiters.)
240
241    The caller retains ownership of DELIMITERS.
242
243    This setting affects parsing of DP_DELIMITED files only. */
244 void
245 data_parser_set_soft_delimiters (struct data_parser *parser,
246                                  struct substring delimiters)
247 {
248   ss_dealloc (&parser->soft_seps);
249   ss_alloc_substring (&parser->soft_seps, delimiters);
250   set_any_sep (parser);
251 }
252
253 /* Sets PARSER's hard delimiters to DELIMITERS.  Hard delimiters
254    separate fields.  A consecutive pair of hard delimiters yield
255    an empty field.
256
257    The caller retains ownership of DELIMITERS.
258
259    This setting affects parsing of DP_DELIMITED files only. */
260 void
261 data_parser_set_hard_delimiters (struct data_parser *parser,
262                                  struct substring delimiters)
263 {
264   ss_dealloc (&parser->hard_seps);
265   ss_alloc_substring (&parser->hard_seps, delimiters);
266   set_any_sep (parser);
267 }
268
269 /* Returns the number of records per case. */
270 int
271 data_parser_get_records (const struct data_parser *parser)
272 {
273   return parser->records_per_case;
274 }
275
276 /* Sets the number of records per case to RECORDS_PER_CASE.
277
278    This setting affects parsing of DP_FIXED files only. */
279 void
280 data_parser_set_records (struct data_parser *parser, int records_per_case)
281 {
282   assert (records_per_case >= 0);
283   assert (records_per_case >= parser->records_per_case);
284   parser->records_per_case = records_per_case;
285 }
286
287 static void
288 add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
289            const char *name, int record, int first_column)
290 {
291   struct field *field;
292
293   if (p->field_cnt == p->field_allocated)
294     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
295   field = &p->fields[p->field_cnt++];
296   field->format = *format;
297   field->case_idx = case_idx;
298   field->name = xstrdup (name);
299   field->record = record;
300   field->first_column = first_column;
301 }
302
303 /* Adds a delimited field to the field parsed by PARSER, which
304    must be configured as a DP_DELIMITED parser.  The field is
305    parsed as input format FORMAT.  Its data will be stored into case
306    index CASE_INDEX.  Errors in input data will be reported
307    against variable NAME. */
308 void
309 data_parser_add_delimited_field (struct data_parser *parser,
310                                  const struct fmt_spec *format, int case_idx,
311                                  const char *name)
312 {
313   assert (parser->type == DP_DELIMITED);
314   add_field (parser, format, case_idx, name, 0, 0);
315 }
316
317 /* Adds a fixed field to the field parsed by PARSER, which
318    must be configured as a DP_FIXED parser.  The field is
319    parsed as input format FORMAT.  Its data will be stored into case
320    index CASE_INDEX.  Errors in input data will be reported
321    against variable NAME.  The field will be drawn from the
322    FORMAT->w columns in 1-based RECORD starting at 1-based
323    column FIRST_COLUMN.
324
325    RECORD must be at least as great as that of any field already
326    added; that is, fields must be added in increasing order of
327    record number.  If RECORD is greater than the current number
328    of records per case, the number of records per case are
329    increased as needed.  */
330 void
331 data_parser_add_fixed_field (struct data_parser *parser,
332                              const struct fmt_spec *format, int case_idx,
333                              const char *name,
334                              int record, int first_column)
335 {
336   assert (parser->type == DP_FIXED);
337   assert (parser->field_cnt == 0
338           || record >= parser->fields[parser->field_cnt - 1].record);
339   if (record > parser->records_per_case)
340     parser->records_per_case = record;
341   add_field (parser, format, case_idx, name, record, first_column);
342 }
343
344 /* Returns true if any fields have been added to PARSER, false
345    otherwise. */
346 bool
347 data_parser_any_fields (const struct data_parser *parser)
348 {
349   return parser->field_cnt > 0;
350 }
351
352 static void
353 set_any_sep (struct data_parser *parser)
354 {
355   ds_assign_substring (&parser->any_sep, parser->soft_seps);
356   ds_put_substring (&parser->any_sep, parser->hard_seps);
357 }
358 \f
359 static bool parse_delimited_span (const struct data_parser *,
360                                   struct dfm_reader *, struct ccase *);
361 static bool parse_delimited_no_span (const struct data_parser *,
362                                      struct dfm_reader *, struct ccase *);
363 static bool parse_fixed (const struct data_parser *,
364                          struct dfm_reader *, struct ccase *);
365
366 /* Reads a case from DFM into C, parsing it with PARSER.  Returns
367    true if successful, false at end of file or on I/O error.
368
369    Case C must not be shared. */
370 bool
371 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
372                    struct ccase *c)
373 {
374   bool retval;
375
376   assert (!case_is_shared (c));
377   assert (data_parser_any_fields (parser));
378
379   /* Skip the requested number of records before reading the
380      first case. */
381   for (; parser->skip_records > 0; parser->skip_records--)
382     {
383       if (dfm_eof (reader))
384         return false;
385       dfm_forward_record (reader);
386     }
387
388   /* Limit cases. */
389   if (parser->type == DP_DELIMITED)
390     {
391       if (parser->span)
392         retval = parse_delimited_span (parser, reader, c);
393       else
394         retval = parse_delimited_no_span (parser, reader, c);
395     }
396   else
397     retval = parse_fixed (parser, reader, c);
398
399   return retval;
400 }
401
402 /* Extracts a delimited field from the current position in the
403    current record according to PARSER, reading data from READER.
404
405    *FIELD is set to the field content.  The caller must not or
406    destroy this constant string.
407
408    Sets *FIRST_COLUMN to the 1-based column number of the start of
409    the extracted field, and *LAST_COLUMN to the end of the extracted
410    field.
411
412    Returns true on success, false on failure. */
413 static bool
414 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
415            int *first_column, int *last_column, struct string *tmp,
416            struct substring *field)
417 {
418   size_t length_before_separators;
419   struct substring line, p;
420   bool quoted;
421
422   if (dfm_eof (reader))
423     return false;
424   if (ss_is_empty (parser->hard_seps))
425     dfm_expand_tabs (reader);
426   line = p = dfm_get_record (reader);
427
428   /* Skip leading soft separators. */
429   ss_ltrim (&p, parser->soft_seps);
430
431   /* Handle empty or completely consumed lines. */
432   if (ss_is_empty (p))
433     {
434       if (!parser->empty_line_has_field || dfm_columns_past_end (reader) > 0)
435         return false;
436       else
437         {
438           *field = p;
439           *first_column = dfm_column_start (reader);
440           *last_column = *first_column + 1;
441           dfm_forward_columns (reader, 1);
442           return true;
443         }
444     }
445
446   *first_column = dfm_column_start (reader);
447   quoted = ss_find_byte (parser->quotes, ss_first (p)) != SIZE_MAX;
448   if (quoted)
449     {
450       /* Quoted field. */
451       int quote = ss_get_byte (&p);
452       if (!ss_get_until (&p, quote, field))
453         msg (DW, _("Quoted string extends beyond end of line."));
454       if (parser->quote_escape && ss_first (p) == quote)
455         {
456           ds_assign_substring (tmp, *field);
457           while (ss_match_byte (&p, quote))
458             {
459               struct substring ss;
460               ds_put_byte (tmp, quote);
461               if (!ss_get_until (&p, quote, &ss))
462                 msg (DW, _("Quoted string extends beyond end of line."));
463               ds_put_substring (tmp, ss);
464             }
465           *field = ds_ss (tmp);
466         }
467       *last_column = *first_column + (ss_length (line) - ss_length (p));
468     }
469   else
470     {
471       /* Regular field. */
472       ss_get_bytes (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
473       *last_column = *first_column + ss_length (*field);
474     }
475
476   /* Skip trailing soft separator and a single hard separator if present. */
477   length_before_separators = ss_length (p);
478   ss_ltrim (&p, parser->soft_seps);
479   if (!ss_is_empty (p)
480       && ss_find_byte (parser->hard_seps, ss_first (p)) != SIZE_MAX)
481     {
482       ss_advance (&p, 1);
483       ss_ltrim (&p, parser->soft_seps);
484     }
485   if (ss_is_empty (p))
486     dfm_forward_columns (reader, 1);
487   else if (quoted && length_before_separators == ss_length (p))
488     msg (DW, _("Missing delimiter following quoted string."));
489   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
490
491   return true;
492 }
493
494 static void
495 parse_error (const struct dfm_reader *reader, const struct field *field,
496              int first_column, int last_column, char *error)
497 {
498   struct msg m = {
499     .category = MSG_C_DATA,
500     .severity = MSG_S_WARNING,
501     .file_name = CONST_CAST (char *, dfm_get_file_name (reader)),
502     .first_line = dfm_get_line_number (reader),
503     .last_line = m.first_line + 1,
504     .first_column = first_column,
505     .last_column = last_column,
506     .text = xasprintf (_("Data for variable %s is not valid as format %s: %s"),
507                        field->name, fmt_name (field->format.type), error),
508   };
509   msg_emit (&m);
510
511   free (error);
512 }
513
514 /* Reads a case from READER into C, parsing it according to
515    fixed-format syntax rules in PARSER.
516    Returns true if successful, false at end of file or on I/O error. */
517 static bool
518 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
519              struct ccase *c)
520 {
521   const char *input_encoding = dfm_reader_get_encoding (reader);
522   const char *output_encoding = dict_get_encoding (parser->dict);
523   struct field *f;
524   int row;
525
526   if (dfm_eof (reader))
527     return false;
528
529   f = parser->fields;
530   for (row = 1; row <= parser->records_per_case; row++)
531     {
532       struct substring line;
533
534       if (dfm_eof (reader))
535         {
536           msg (DW, _("Partial case of %d of %d records discarded."),
537                row - 1, parser->records_per_case);
538           return false;
539         }
540       dfm_expand_tabs (reader);
541       line = dfm_get_record (reader);
542
543       for (; f < &parser->fields[parser->field_cnt] && f->record == row; f++)
544         {
545           struct substring s = ss_substr (line, f->first_column - 1,
546                                           f->format.w);
547           union value *value = case_data_rw_idx (c, f->case_idx);
548           char *error = data_in (s, input_encoding, f->format.type,
549                                  settings_get_fmt_settings (),
550                                  value, fmt_var_width (&f->format),
551                                  output_encoding);
552
553           if (error == NULL)
554             data_in_imply_decimals (s, input_encoding, f->format.type,
555                                     f->format.d, settings_get_fmt_settings (),
556                                     value);
557           else
558             parse_error (reader, f, f->first_column,
559                          f->first_column + f->format.w, error);
560         }
561
562       dfm_forward_record (reader);
563     }
564
565   return true;
566 }
567
568 /* Reads a case from READER into C, parsing it according to
569    free-format syntax rules in PARSER.
570    Returns true if successful, false at end of file or on I/O error. */
571 static bool
572 parse_delimited_span (const struct data_parser *parser,
573                       struct dfm_reader *reader, struct ccase *c)
574 {
575   const char *output_encoding = dict_get_encoding (parser->dict);
576   struct string tmp = DS_EMPTY_INITIALIZER;
577   struct field *f;
578
579   for (f = parser->fields; f < &parser->fields[parser->field_cnt]; f++)
580     {
581       struct substring s;
582       int first_column, last_column;
583       char *error;
584
585       /* Cut out a field and read in a new record if necessary. */
586       while (!cut_field (parser, reader,
587                          &first_column, &last_column, &tmp, &s))
588         {
589           if (!dfm_eof (reader))
590             dfm_forward_record (reader);
591           if (dfm_eof (reader))
592             {
593               if (f > parser->fields)
594                 msg (DW, _("Partial case discarded.  The first variable "
595                            "missing was %s."), f->name);
596               ds_destroy (&tmp);
597               return false;
598             }
599         }
600
601       const char *input_encoding = dfm_reader_get_encoding (reader);
602       error = data_in (s, input_encoding, f->format.type,
603                        settings_get_fmt_settings (),
604                        case_data_rw_idx (c, f->case_idx),
605                        fmt_var_width (&f->format), output_encoding);
606       if (error != NULL)
607         parse_error (reader, f, first_column, last_column, error);
608     }
609   ds_destroy (&tmp);
610   return true;
611 }
612
613 /* Reads a case from READER into C, parsing it according to
614    delimited syntax rules with one case per record in PARSER.
615    Returns true if successful, false at end of file or on I/O error. */
616 static bool
617 parse_delimited_no_span (const struct data_parser *parser,
618                          struct dfm_reader *reader, struct ccase *c)
619 {
620   const char *output_encoding = dict_get_encoding (parser->dict);
621   struct string tmp = DS_EMPTY_INITIALIZER;
622   struct substring s;
623   struct field *f, *end;
624
625   if (dfm_eof (reader))
626     return false;
627
628   end = &parser->fields[parser->field_cnt];
629   for (f = parser->fields; f < end; f++)
630     {
631       int first_column, last_column;
632       char *error;
633
634       if (!cut_field (parser, reader, &first_column, &last_column, &tmp, &s))
635         {
636           if (f < end - 1 && settings_get_undefined () && parser->warn_missing_fields)
637             msg (DW, _("Missing value(s) for all variables from %s onward.  "
638                        "These will be filled with the system-missing value "
639                        "or blanks, as appropriate."),
640                  f->name);
641           for (; f < end; f++)
642             value_set_missing (case_data_rw_idx (c, f->case_idx),
643                                fmt_var_width (&f->format));
644           goto exit;
645         }
646
647       const char *input_encoding = dfm_reader_get_encoding (reader);
648       error = data_in (s, input_encoding, f->format.type,
649                        settings_get_fmt_settings (),
650                        case_data_rw_idx (c, f->case_idx),
651                        fmt_var_width (&f->format), output_encoding);
652       if (error != NULL)
653         parse_error (reader, f, first_column, last_column, error);
654     }
655
656   s = dfm_get_record (reader);
657   ss_ltrim (&s, parser->soft_seps);
658   if (!ss_is_empty (s))
659     msg (DW, _("Record ends in data not part of any field."));
660
661 exit:
662   dfm_forward_record (reader);
663   ds_destroy (&tmp);
664   return true;
665 }
666 \f
667 /* Displays a table giving information on fixed-format variable
668    parsing on DATA LIST. */
669 static void
670 dump_fixed_table (const struct data_parser *parser,
671                   const struct file_handle *fh)
672 {
673   /* XXX This should not be preformatted. */
674   char *title = xasprintf (ngettext ("Reading %d record from %s.",
675                                      "Reading %d records from %s.",
676                                      parser->records_per_case),
677                            parser->records_per_case, fh_get_name (fh));
678   struct pivot_table *table = pivot_table_create__ (
679     pivot_value_new_user_text (title, -1), "Fixed Data Records");
680   free (title);
681
682   pivot_dimension_create (
683     table, PIVOT_AXIS_COLUMN, N_("Attributes"),
684     N_("Record"), N_("Columns"), N_("Format"));
685
686   struct pivot_dimension *variables = pivot_dimension_create (
687     table, PIVOT_AXIS_ROW, N_("Variable"));
688   variables->root->show_label = true;
689   for (size_t i = 0; i < parser->field_cnt; i++)
690     {
691       struct field *f = &parser->fields[i];
692
693       /* XXX It would be better to have the actual variable here. */
694       int variable_idx = pivot_category_create_leaf (
695         variables->root, pivot_value_new_user_text (f->name, -1));
696
697       pivot_table_put2 (table, 0, variable_idx,
698                         pivot_value_new_integer (f->record));
699
700       int first_column = f->first_column;
701       int last_column = f->first_column + f->format.w - 1;
702       char *columns = xasprintf ("%d-%d", first_column, last_column);
703       pivot_table_put2 (table, 1, variable_idx,
704                         pivot_value_new_user_text (columns, -1));
705       free (columns);
706
707       char str[FMT_STRING_LEN_MAX + 1];
708       pivot_table_put2 (table, 2, variable_idx,
709                         pivot_value_new_user_text (
710                           fmt_to_string (&f->format, str), -1));
711
712     }
713
714   pivot_table_submit (table);
715 }
716
717 /* Displays a table giving information on free-format variable parsing
718    on DATA LIST. */
719 static void
720 dump_delimited_table (const struct data_parser *parser,
721                       const struct file_handle *fh)
722 {
723   struct pivot_table *table = pivot_table_create__ (
724     pivot_value_new_text_format (N_("Reading free-form data from %s."),
725                                  fh_get_name (fh)),
726     "Free-Form Data Records");
727
728   pivot_dimension_create (
729     table, PIVOT_AXIS_COLUMN, N_("Attributes"), N_("Format"));
730
731   struct pivot_dimension *variables = pivot_dimension_create (
732     table, PIVOT_AXIS_ROW, N_("Variable"));
733   variables->root->show_label = true;
734   for (size_t i = 0; i < parser->field_cnt; i++)
735     {
736       struct field *f = &parser->fields[i];
737
738       /* XXX It would be better to have the actual variable here. */
739       int variable_idx = pivot_category_create_leaf (
740         variables->root, pivot_value_new_user_text (f->name, -1));
741
742       char str[FMT_STRING_LEN_MAX + 1];
743       pivot_table_put2 (table, 0, variable_idx,
744                         pivot_value_new_user_text (
745                           fmt_to_string (&f->format, str), -1));
746     }
747
748   pivot_table_submit (table);
749 }
750
751 /* Displays a table giving information on how PARSER will read
752    data from FH. */
753 void
754 data_parser_output_description (struct data_parser *parser,
755                                 const struct file_handle *fh)
756 {
757   if (parser->type == DP_FIXED)
758     dump_fixed_table (parser, fh);
759   else
760     dump_delimited_table (parser, fh);
761 }
762 \f
763 /* Data parser input program. */
764 struct data_parser_casereader
765   {
766     struct data_parser *parser; /* Parser. */
767     struct dfm_reader *reader;  /* Data file reader. */
768     struct caseproto *proto;    /* Format of cases. */
769   };
770
771 static const struct casereader_class data_parser_casereader_class;
772
773 /* Replaces DS's active dataset by an input program that reads data
774    from READER according to the rules in PARSER, using DICT as
775    the underlying dictionary.  Ownership of PARSER and READER is
776    transferred to the input program, and ownership of DICT is
777    transferred to the dataset. */
778 void
779 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
780                                struct dfm_reader *reader,
781                                struct dictionary *dict,
782                                struct casereader* (*func)(struct casereader *,
783                                                           const struct dictionary *,
784                                                           void *),
785                                void *ud)
786 {
787   struct data_parser_casereader *r;
788   struct casereader *casereader0;
789   struct casereader *casereader1;
790
791   r = xmalloc (sizeof *r);
792   r->parser = parser;
793   r->reader = reader;
794   r->proto = caseproto_ref (dict_get_proto (dict));
795   casereader0 = casereader_create_sequential (NULL, r->proto,
796                                              CASENUMBER_MAX,
797                                              &data_parser_casereader_class, r);
798
799   if (func)
800     casereader1 = func (casereader0, dict, ud);
801   else
802     casereader1 = casereader0;
803
804   dataset_set_dict (ds, dict);
805   dataset_set_source (ds, casereader1);
806 }
807
808
809 static struct ccase *
810 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
811 {
812   struct data_parser_casereader *r = r_;
813   struct ccase *c = case_create (r->proto);
814   if (data_parser_parse (r->parser, r->reader, c))
815     return c;
816   else
817     {
818       case_unref (c);
819       return NULL;
820     }
821 }
822
823 static void
824 data_parser_casereader_destroy (struct casereader *reader, void *r_)
825 {
826   struct data_parser_casereader *r = r_;
827   if (dfm_reader_error (r->reader))
828     casereader_force_error (reader);
829   dfm_close_reader (r->reader);
830   caseproto_unref (r->proto);
831   data_parser_destroy (r->parser);
832   free (r);
833 }
834
835 static const struct casereader_class data_parser_casereader_class =
836   {
837     data_parser_casereader_read,
838     data_parser_casereader_destroy,
839     NULL,
840     NULL,
841   };