data_parser: Keep a reference to the dictionary
[pspp] / src / language / data-io / data-parser.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013, 2016 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/data-io/data-parser.h"
20
21 #include <stdint.h>
22 #include <stdlib.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/data-in.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/file-handle-def.h"
30 #include "data/settings.h"
31 #include "language/data-io/data-reader.h"
32 #include "libpspp/message.h"
33 #include "libpspp/str.h"
34 #include "output/pivot-table.h"
35
36 #include "gl/xalloc.h"
37
38 #include "gettext.h"
39 #define N_(msgid) msgid
40 #define _(msgid) gettext (msgid)
41
42 /* Data parser for textual data like that read by DATA LIST. */
43 struct data_parser
44   {
45     struct dictionary *dict;    /* Dictionary of destination */
46     enum data_parser_type type; /* Type of data to parse. */
47     int skip_records;           /* Records to skip before first real data. */
48
49     struct field *fields;       /* Fields to parse. */
50     size_t field_cnt;           /* Number of fields. */
51     size_t field_allocated;     /* Number of fields spaced allocated for. */
52
53     /* DP_DELIMITED parsers only. */
54     bool span;                  /* May cases span multiple records? */
55     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
56     bool warn_missing_fields;   /* Should missing fields be considered errors? */
57     struct substring quotes;    /* Characters that can quote separators. */
58     bool quote_escape;          /* Doubled quote acts as escape? */
59     struct substring soft_seps; /* Two soft separators act like just one. */
60     struct substring hard_seps; /* Two hard separators yield empty fields. */
61     struct string any_sep;      /* Concatenation of soft_seps and hard_seps. */
62
63     /* DP_FIXED parsers only. */
64     int records_per_case;       /* Number of records in each case. */
65   };
66
67 /* How to parse one variable. */
68 struct field
69   {
70     struct fmt_spec format;     /* Input format of this field. */
71     int case_idx;               /* First value in case. */
72     char *name;                 /* Var name for error messages and tables. */
73
74     /* DP_FIXED only. */
75     int record;                 /* Record number (1-based). */
76     int first_column;           /* First column in record (1-based). */
77   };
78
79 static void set_any_sep (struct data_parser *parser);
80
81 /* Creates and returns a new data parser. */
82 struct data_parser *
83 data_parser_create (struct dictionary *dict)
84 {
85   struct data_parser *parser = xmalloc (sizeof *parser);
86
87   parser->type = DP_FIXED;
88   parser->skip_records = 0;
89
90   parser->fields = NULL;
91   parser->field_cnt = 0;
92   parser->field_allocated = 0;
93   parser->dict = dict_ref (dict);
94
95   parser->span = true;
96   parser->empty_line_has_field = false;
97   parser->warn_missing_fields = true;
98   ss_alloc_substring (&parser->quotes, ss_cstr ("\"'"));
99   parser->quote_escape = false;
100   ss_alloc_substring (&parser->soft_seps, ss_cstr (CC_SPACES));
101   ss_alloc_substring (&parser->hard_seps, ss_cstr (","));
102   ds_init_empty (&parser->any_sep);
103   set_any_sep (parser);
104
105   parser->records_per_case = 0;
106
107   return parser;
108 }
109
110 /* Destroys PARSER. */
111 void
112 data_parser_destroy (struct data_parser *parser)
113 {
114   if (parser != NULL)
115     {
116       size_t i;
117
118       dict_unref (parser->dict);
119       for (i = 0; i < parser->field_cnt; i++)
120         free (parser->fields[i].name);
121       free (parser->fields);
122       ss_dealloc (&parser->quotes);
123       ss_dealloc (&parser->soft_seps);
124       ss_dealloc (&parser->hard_seps);
125       ds_destroy (&parser->any_sep);
126       free (parser);
127     }
128 }
129
130 /* Returns the type of PARSER (either DP_DELIMITED or DP_FIXED). */
131 enum data_parser_type
132 data_parser_get_type (const struct data_parser *parser)
133 {
134   return parser->type;
135 }
136
137 /* Sets the type of PARSER to TYPE (either DP_DELIMITED or
138    DP_FIXED). */
139 void
140 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
141 {
142   assert (parser->field_cnt == 0);
143   assert (type == DP_FIXED || type == DP_DELIMITED);
144   parser->type = type;
145 }
146
147 /* Configures PARSER to skip the specified number of
148    INITIAL_RECORDS_TO_SKIP before parsing any data.  By default,
149    no records are skipped. */
150 void
151 data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
152 {
153   assert (initial_records_to_skip >= 0);
154   parser->skip_records = initial_records_to_skip;
155 }
156
157 /* Returns true if PARSER is configured to allow cases to span
158    multiple records. */
159 bool
160 data_parser_get_span (const struct data_parser *parser)
161 {
162   return parser->span;
163 }
164
165 /* If MAY_CASES_SPAN_RECORDS is true, configures PARSER to allow
166    a single case to span multiple records and multiple cases to
167    occupy a single record.  If MAY_CASES_SPAN_RECORDS is false,
168    configures PARSER to require each record to contain exactly
169    one case.
170
171    This setting affects parsing of DP_DELIMITED files only. */
172 void
173 data_parser_set_span (struct data_parser *parser, bool may_cases_span_records)
174 {
175   parser->span = may_cases_span_records;
176 }
177
178 /* If EMPTY_LINE_HAS_FIELD is true, configures PARSER to parse an
179    empty line as an empty field and to treat a hard delimiter
180    followed by end-of-line as an empty field.  If
181    EMPTY_LINE_HAS_FIELD is false, PARSER will skip empty lines
182    and hard delimiters at the end of lines without emitting empty
183    fields.
184
185    This setting affects parsing of DP_DELIMITED files only. */
186 void
187 data_parser_set_empty_line_has_field (struct data_parser *parser,
188                                       bool empty_line_has_field)
189 {
190   parser->empty_line_has_field = empty_line_has_field;
191 }
192
193
194 /* If WARN_MISSING_FIELDS is true, configures PARSER to emit a warning
195    and cause an error condition when a missing field is encountered.
196    If  WARN_MISSING_FIELDS is false, PARSER will silently fill such
197    fields with the system missing value.
198
199    This setting affects parsing of DP_DELIMITED files only. */
200 void
201 data_parser_set_warn_missing_fields (struct data_parser *parser,
202                                      bool warn_missing_fields)
203 {
204   parser->warn_missing_fields = warn_missing_fields;
205 }
206
207
208 /* Sets the characters that may be used for quoting field
209    contents to QUOTES.  If QUOTES is empty, quoting will be
210    disabled.
211
212    The caller retains ownership of QUOTES.
213
214    This setting affects parsing of DP_DELIMITED files only. */
215 void
216 data_parser_set_quotes (struct data_parser *parser, struct substring quotes)
217 {
218   ss_dealloc (&parser->quotes);
219   ss_alloc_substring (&parser->quotes, quotes);
220 }
221
222 /* If ESCAPE is false (the default setting), a character used for
223    quoting cannot itself be embedded within a quoted field.  If
224    ESCAPE is true, then a quote character can be embedded within
225    a quoted field by doubling it.
226
227    This setting affects parsing of DP_DELIMITED files only, and
228    only when at least one quote character has been set (with
229    data_parser_set_quotes). */
230 void
231 data_parser_set_quote_escape (struct data_parser *parser, bool escape)
232 {
233   parser->quote_escape = escape;
234 }
235
236 /* Sets PARSER's soft delimiters to DELIMITERS.  Soft delimiters
237    separate fields, but consecutive soft delimiters do not yield
238    empty fields.  (Ordinarily, only white space characters are
239    appropriate soft delimiters.)
240
241    The caller retains ownership of DELIMITERS.
242
243    This setting affects parsing of DP_DELIMITED files only. */
244 void
245 data_parser_set_soft_delimiters (struct data_parser *parser,
246                                  struct substring delimiters)
247 {
248   ss_dealloc (&parser->soft_seps);
249   ss_alloc_substring (&parser->soft_seps, delimiters);
250   set_any_sep (parser);
251 }
252
253 /* Sets PARSER's hard delimiters to DELIMITERS.  Hard delimiters
254    separate fields.  A consecutive pair of hard delimiters yield
255    an empty field.
256
257    The caller retains ownership of DELIMITERS.
258
259    This setting affects parsing of DP_DELIMITED files only. */
260 void
261 data_parser_set_hard_delimiters (struct data_parser *parser,
262                                  struct substring delimiters)
263 {
264   ss_dealloc (&parser->hard_seps);
265   ss_alloc_substring (&parser->hard_seps, delimiters);
266   set_any_sep (parser);
267 }
268
269 /* Returns the number of records per case. */
270 int
271 data_parser_get_records (const struct data_parser *parser)
272 {
273   return parser->records_per_case;
274 }
275
276 /* Sets the number of records per case to RECORDS_PER_CASE.
277
278    This setting affects parsing of DP_FIXED files only. */
279 void
280 data_parser_set_records (struct data_parser *parser, int records_per_case)
281 {
282   assert (records_per_case >= 0);
283   assert (records_per_case >= parser->records_per_case);
284   parser->records_per_case = records_per_case;
285 }
286
287 static void
288 add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
289            const char *name, int record, int first_column)
290 {
291   struct field *field;
292
293   if (p->field_cnt == p->field_allocated)
294     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
295   field = &p->fields[p->field_cnt++];
296   field->format = *format;
297   field->case_idx = case_idx;
298   field->name = xstrdup (name);
299   field->record = record;
300   field->first_column = first_column;
301 }
302
303 /* Adds a delimited field to the field parsed by PARSER, which
304    must be configured as a DP_DELIMITED parser.  The field is
305    parsed as input format FORMAT.  Its data will be stored into case
306    index CASE_INDEX.  Errors in input data will be reported
307    against variable NAME. */
308 void
309 data_parser_add_delimited_field (struct data_parser *parser,
310                                  const struct fmt_spec *format, int case_idx,
311                                  const char *name)
312 {
313   assert (parser->type == DP_DELIMITED);
314   add_field (parser, format, case_idx, name, 0, 0);
315 }
316
317 /* Adds a fixed field to the field parsed by PARSER, which
318    must be configured as a DP_FIXED parser.  The field is
319    parsed as input format FORMAT.  Its data will be stored into case
320    index CASE_INDEX.  Errors in input data will be reported
321    against variable NAME.  The field will be drawn from the
322    FORMAT->w columns in 1-based RECORD starting at 1-based
323    column FIRST_COLUMN.
324
325    RECORD must be at least as great as that of any field already
326    added; that is, fields must be added in increasing order of
327    record number.  If RECORD is greater than the current number
328    of records per case, the number of records per case are
329    increased as needed.  */
330 void
331 data_parser_add_fixed_field (struct data_parser *parser,
332                              const struct fmt_spec *format, int case_idx,
333                              const char *name,
334                              int record, int first_column)
335 {
336   assert (parser->type == DP_FIXED);
337   assert (parser->field_cnt == 0
338           || record >= parser->fields[parser->field_cnt - 1].record);
339   if (record > parser->records_per_case)
340     parser->records_per_case = record;
341   add_field (parser, format, case_idx, name, record, first_column);
342 }
343
344 /* Returns true if any fields have been added to PARSER, false
345    otherwise. */
346 bool
347 data_parser_any_fields (const struct data_parser *parser)
348 {
349   return parser->field_cnt > 0;
350 }
351
352 static void
353 set_any_sep (struct data_parser *parser)
354 {
355   ds_assign_substring (&parser->any_sep, parser->soft_seps);
356   ds_put_substring (&parser->any_sep, parser->hard_seps);
357 }
358 \f
359 static bool parse_delimited_span (const struct data_parser *,
360                                   struct dfm_reader *, struct ccase *);
361 static bool parse_delimited_no_span (const struct data_parser *,
362                                      struct dfm_reader *, struct ccase *);
363 static bool parse_fixed (const struct data_parser *,
364                          struct dfm_reader *, struct ccase *);
365
366 /* Reads a case from DFM into C, parsing it with PARSER.  Returns
367    true if successful, false at end of file or on I/O error.
368
369    Case C must not be shared. */
370 bool
371 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
372                    struct ccase *c)
373 {
374   bool retval;
375
376   assert (!case_is_shared (c));
377   assert (data_parser_any_fields (parser));
378
379   /* Skip the requested number of records before reading the
380      first case. */
381   for (; parser->skip_records > 0; parser->skip_records--)
382     {
383       if (dfm_eof (reader))
384         return false;
385       dfm_forward_record (reader);
386     }
387
388   /* Limit cases. */
389   if (parser->type == DP_DELIMITED)
390     {
391       if (parser->span)
392         retval = parse_delimited_span (parser, reader, c);
393       else
394         retval = parse_delimited_no_span (parser, reader, c);
395     }
396   else
397     retval = parse_fixed (parser, reader, c);
398
399   return retval;
400 }
401
402 /* Extracts a delimited field from the current position in the
403    current record according to PARSER, reading data from READER.
404
405    *FIELD is set to the field content.  The caller must not or
406    destroy this constant string.
407
408    Sets *FIRST_COLUMN to the 1-based column number of the start of
409    the extracted field, and *LAST_COLUMN to the end of the extracted
410    field.
411
412    Returns true on success, false on failure. */
413 static bool
414 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
415            int *first_column, int *last_column, struct string *tmp,
416            struct substring *field)
417 {
418   size_t length_before_separators;
419   struct substring line, p;
420   bool quoted;
421
422   if (dfm_eof (reader))
423     return false;
424   if (ss_is_empty (parser->hard_seps))
425     dfm_expand_tabs (reader);
426   line = p = dfm_get_record (reader);
427
428   /* Skip leading soft separators. */
429   ss_ltrim (&p, parser->soft_seps);
430
431   /* Handle empty or completely consumed lines. */
432   if (ss_is_empty (p))
433     {
434       if (!parser->empty_line_has_field || dfm_columns_past_end (reader) > 0)
435         return false;
436       else
437         {
438           *field = p;
439           *first_column = dfm_column_start (reader);
440           *last_column = *first_column + 1;
441           dfm_forward_columns (reader, 1);
442           return true;
443         }
444     }
445
446   *first_column = dfm_column_start (reader);
447   quoted = ss_find_byte (parser->quotes, ss_first (p)) != SIZE_MAX;
448   if (quoted)
449     {
450       /* Quoted field. */
451       int quote = ss_get_byte (&p);
452       if (!ss_get_until (&p, quote, field))
453         msg (DW, _("Quoted string extends beyond end of line."));
454       if (parser->quote_escape && ss_first (p) == quote)
455         {
456           ds_assign_substring (tmp, *field);
457           while (ss_match_byte (&p, quote))
458             {
459               struct substring ss;
460               ds_put_byte (tmp, quote);
461               if (!ss_get_until (&p, quote, &ss))
462                 msg (DW, _("Quoted string extends beyond end of line."));
463               ds_put_substring (tmp, ss);
464             }
465           *field = ds_ss (tmp);
466         }
467       *last_column = *first_column + (ss_length (line) - ss_length (p));
468     }
469   else
470     {
471       /* Regular field. */
472       ss_get_bytes (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
473       *last_column = *first_column + ss_length (*field);
474     }
475
476   /* Skip trailing soft separator and a single hard separator if present. */
477   length_before_separators = ss_length (p);
478   ss_ltrim (&p, parser->soft_seps);
479   if (!ss_is_empty (p)
480       && ss_find_byte (parser->hard_seps, ss_first (p)) != SIZE_MAX)
481     {
482       ss_advance (&p, 1);
483       ss_ltrim (&p, parser->soft_seps);
484     }
485   if (ss_is_empty (p))
486     dfm_forward_columns (reader, 1);
487   else if (quoted && length_before_separators == ss_length (p))
488     msg (DW, _("Missing delimiter following quoted string."));
489   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
490
491   return true;
492 }
493
494 static void
495 parse_error (const struct dfm_reader *reader, const struct field *field,
496              int first_column, int last_column, char *error)
497 {
498   struct msg m = {
499     .category = MSG_C_DATA,
500     .severity = MSG_S_WARNING,
501     .file_name = CONST_CAST (char *, dfm_get_file_name (reader)),
502     .first_line = dfm_get_line_number (reader),
503     .last_line = m.first_line + 1,
504     .first_column = first_column,
505     .last_column = last_column,
506     .text = xasprintf (_("Data for variable %s is not valid as format %s: %s"),
507                        field->name, fmt_name (field->format.type), error),
508   };
509   msg_emit (&m);
510
511   free (error);
512 }
513
514 /* Reads a case from READER into C, parsing it according to
515    fixed-format syntax rules in PARSER.
516    Returns true if successful, false at end of file or on I/O error. */
517 static bool
518 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
519              struct ccase *c)
520 {
521   const char *input_encoding = dfm_reader_get_encoding (reader);
522   const char *output_encoding = dict_get_encoding (parser->dict);
523   struct field *f;
524   int row;
525
526   if (dfm_eof (reader))
527     return false;
528
529   f = parser->fields;
530   for (row = 1; row <= parser->records_per_case; row++)
531     {
532       struct substring line;
533
534       if (dfm_eof (reader))
535         {
536           msg (DW, _("Partial case of %d of %d records discarded."),
537                row - 1, parser->records_per_case);
538           return false;
539         }
540       dfm_expand_tabs (reader);
541       line = dfm_get_record (reader);
542
543       for (; f < &parser->fields[parser->field_cnt] && f->record == row; f++)
544         {
545           struct substring s = ss_substr (line, f->first_column - 1,
546                                           f->format.w);
547           union value *value = case_data_rw_idx (c, f->case_idx);
548           char *error = data_in (s, input_encoding, f->format.type,
549                                  value, fmt_var_width (&f->format),
550                                  output_encoding);
551
552           if (error == NULL)
553             data_in_imply_decimals (s, input_encoding, f->format.type,
554                                     f->format.d, value);
555           else
556             parse_error (reader, f, f->first_column,
557                          f->first_column + f->format.w, error);
558         }
559
560       dfm_forward_record (reader);
561     }
562
563   return true;
564 }
565
566 /* Reads a case from READER into C, parsing it according to
567    free-format syntax rules in PARSER.
568    Returns true if successful, false at end of file or on I/O error. */
569 static bool
570 parse_delimited_span (const struct data_parser *parser,
571                       struct dfm_reader *reader, struct ccase *c)
572 {
573   const char *output_encoding = dict_get_encoding (parser->dict);
574   struct string tmp = DS_EMPTY_INITIALIZER;
575   struct field *f;
576
577   for (f = parser->fields; f < &parser->fields[parser->field_cnt]; f++)
578     {
579       struct substring s;
580       int first_column, last_column;
581       char *error;
582
583       /* Cut out a field and read in a new record if necessary. */
584       while (!cut_field (parser, reader,
585                          &first_column, &last_column, &tmp, &s))
586         {
587           if (!dfm_eof (reader))
588             dfm_forward_record (reader);
589           if (dfm_eof (reader))
590             {
591               if (f > parser->fields)
592                 msg (DW, _("Partial case discarded.  The first variable "
593                            "missing was %s."), f->name);
594               ds_destroy (&tmp);
595               return false;
596             }
597         }
598
599       const char *input_encoding = dfm_reader_get_encoding (reader);
600       error = data_in (s, input_encoding, f->format.type,
601                        case_data_rw_idx (c, f->case_idx),
602                        fmt_var_width (&f->format), output_encoding);
603       if (error != NULL)
604         parse_error (reader, f, first_column, last_column, error);
605     }
606   ds_destroy (&tmp);
607   return true;
608 }
609
610 /* Reads a case from READER into C, parsing it according to
611    delimited syntax rules with one case per record in PARSER.
612    Returns true if successful, false at end of file or on I/O error. */
613 static bool
614 parse_delimited_no_span (const struct data_parser *parser,
615                          struct dfm_reader *reader, struct ccase *c)
616 {
617   const char *output_encoding = dict_get_encoding (parser->dict);
618   struct string tmp = DS_EMPTY_INITIALIZER;
619   struct substring s;
620   struct field *f, *end;
621
622   if (dfm_eof (reader))
623     return false;
624
625   end = &parser->fields[parser->field_cnt];
626   for (f = parser->fields; f < end; f++)
627     {
628       int first_column, last_column;
629       char *error;
630
631       if (!cut_field (parser, reader, &first_column, &last_column, &tmp, &s))
632         {
633           if (f < end - 1 && settings_get_undefined () && parser->warn_missing_fields)
634             msg (DW, _("Missing value(s) for all variables from %s onward.  "
635                        "These will be filled with the system-missing value "
636                        "or blanks, as appropriate."),
637                  f->name);
638           for (; f < end; f++)
639             value_set_missing (case_data_rw_idx (c, f->case_idx),
640                                fmt_var_width (&f->format));
641           goto exit;
642         }
643
644       const char *input_encoding = dfm_reader_get_encoding (reader);
645       error = data_in (s, input_encoding, f->format.type,
646                        case_data_rw_idx (c, f->case_idx),
647                        fmt_var_width (&f->format), output_encoding);
648       if (error != NULL)
649         parse_error (reader, f, first_column, last_column, error);
650     }
651
652   s = dfm_get_record (reader);
653   ss_ltrim (&s, parser->soft_seps);
654   if (!ss_is_empty (s))
655     msg (DW, _("Record ends in data not part of any field."));
656
657 exit:
658   dfm_forward_record (reader);
659   ds_destroy (&tmp);
660   return true;
661 }
662 \f
663 /* Displays a table giving information on fixed-format variable
664    parsing on DATA LIST. */
665 static void
666 dump_fixed_table (const struct data_parser *parser,
667                   const struct file_handle *fh)
668 {
669   /* XXX This should not be preformatted. */
670   char *title = xasprintf (ngettext ("Reading %d record from %s.",
671                                      "Reading %d records from %s.",
672                                      parser->records_per_case),
673                            parser->records_per_case, fh_get_name (fh));
674   struct pivot_table *table = pivot_table_create__ (
675     pivot_value_new_user_text (title, -1), "Fixed Data Records");
676   free (title);
677
678   pivot_dimension_create (
679     table, PIVOT_AXIS_COLUMN, N_("Attributes"),
680     N_("Record"), N_("Columns"), N_("Format"));
681
682   struct pivot_dimension *variables = pivot_dimension_create (
683     table, PIVOT_AXIS_ROW, N_("Variable"));
684   variables->root->show_label = true;
685   for (size_t i = 0; i < parser->field_cnt; i++)
686     {
687       struct field *f = &parser->fields[i];
688
689       /* XXX It would be better to have the actual variable here. */
690       int variable_idx = pivot_category_create_leaf (
691         variables->root, pivot_value_new_user_text (f->name, -1));
692
693       pivot_table_put2 (table, 0, variable_idx,
694                         pivot_value_new_integer (f->record));
695
696       int first_column = f->first_column;
697       int last_column = f->first_column + f->format.w - 1;
698       char *columns = xasprintf ("%3d-%3d", first_column, last_column);
699       pivot_table_put2 (table, 1, variable_idx,
700                         pivot_value_new_user_text (columns, -1));
701       free (columns);
702
703       char str[FMT_STRING_LEN_MAX + 1];
704       pivot_table_put2 (table, 2, variable_idx,
705                         pivot_value_new_user_text (
706                           fmt_to_string (&f->format, str), -1));
707
708     }
709
710   pivot_table_submit (table);
711 }
712
713 /* Displays a table giving information on free-format variable parsing
714    on DATA LIST. */
715 static void
716 dump_delimited_table (const struct data_parser *parser,
717                       const struct file_handle *fh)
718 {
719   struct pivot_table *table = pivot_table_create__ (
720     pivot_value_new_text_format (N_("Reading free-form data from %s."),
721                                  fh_get_name (fh)),
722     "Free-Form Data Records");
723
724   pivot_dimension_create (
725     table, PIVOT_AXIS_COLUMN, N_("Attributes"), N_("Format"));
726
727   struct pivot_dimension *variables = pivot_dimension_create (
728     table, PIVOT_AXIS_ROW, N_("Variable"));
729   variables->root->show_label = true;
730   for (size_t i = 0; i < parser->field_cnt; i++)
731     {
732       struct field *f = &parser->fields[i];
733
734       /* XXX It would be better to have the actual variable here. */
735       int variable_idx = pivot_category_create_leaf (
736         variables->root, pivot_value_new_user_text (f->name, -1));
737
738       char str[FMT_STRING_LEN_MAX + 1];
739       pivot_table_put2 (table, 0, variable_idx,
740                         pivot_value_new_user_text (
741                           fmt_to_string (&f->format, str), -1));
742     }
743
744   pivot_table_submit (table);
745 }
746
747 /* Displays a table giving information on how PARSER will read
748    data from FH. */
749 void
750 data_parser_output_description (struct data_parser *parser,
751                                 const struct file_handle *fh)
752 {
753   if (parser->type == DP_FIXED)
754     dump_fixed_table (parser, fh);
755   else
756     dump_delimited_table (parser, fh);
757 }
758 \f
759 /* Data parser input program. */
760 struct data_parser_casereader
761   {
762     struct data_parser *parser; /* Parser. */
763     struct dfm_reader *reader;  /* Data file reader. */
764     struct caseproto *proto;    /* Format of cases. */
765   };
766
767 static const struct casereader_class data_parser_casereader_class;
768
769 /* Replaces DS's active dataset by an input program that reads data
770    from READER according to the rules in PARSER, using DICT as
771    the underlying dictionary.  Ownership of PARSER and READER is
772    transferred to the input program, and ownership of DICT is
773    transferred to the dataset. */
774 void
775 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
776                                struct dfm_reader *reader,
777                                struct dictionary *dict,
778                                struct casereader* (*func)(struct casereader *,
779                                                           const struct dictionary *,
780                                                           void *),
781                                void *ud)
782 {
783   struct data_parser_casereader *r;
784   struct casereader *casereader0;
785   struct casereader *casereader1;
786
787   r = xmalloc (sizeof *r);
788   r->parser = parser;
789   r->reader = reader;
790   r->proto = caseproto_ref (dict_get_proto (dict));
791   casereader0 = casereader_create_sequential (NULL, r->proto,
792                                              CASENUMBER_MAX,
793                                              &data_parser_casereader_class, r);
794
795   if (func)
796     casereader1 = func (casereader0, dict, ud);
797   else
798     casereader1 = casereader0;
799
800   dataset_set_dict (ds, dict);
801   dataset_set_source (ds, casereader1);
802 }
803
804
805 static struct ccase *
806 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
807 {
808   struct data_parser_casereader *r = r_;
809   struct ccase *c = case_create (r->proto);
810   if (data_parser_parse (r->parser, r->reader, c))
811     return c;
812   else
813     {
814       case_unref (c);
815       return NULL;
816     }
817 }
818
819 static void
820 data_parser_casereader_destroy (struct casereader *reader, void *r_)
821 {
822   struct data_parser_casereader *r = r_;
823   if (dfm_reader_error (r->reader))
824     casereader_force_error (reader);
825   dfm_close_reader (r->reader);
826   caseproto_unref (r->proto);
827   data_parser_destroy (r->parser);
828   free (r);
829 }
830
831 static const struct casereader_class data_parser_casereader_class =
832   {
833     data_parser_casereader_read,
834     data_parser_casereader_destroy,
835     NULL,
836     NULL,
837   };