1d8a493e0db49e54b96f0d1063fd5ab69f6bc95f
[pspp] / src / language / data-io / data-parser.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013, 2016 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/data-io/data-parser.h"
20
21 #include <stdint.h>
22 #include <stdlib.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/data-in.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/file-handle-def.h"
30 #include "data/settings.h"
31 #include "language/data-io/data-reader.h"
32 #include "libpspp/message.h"
33 #include "libpspp/str.h"
34 #include "output/tab.h"
35
36 #include "gl/xalloc.h"
37
38 #include "gettext.h"
39 #define _(msgid) gettext (msgid)
40
41 /* Data parser for textual data like that read by DATA LIST. */
42 struct data_parser
43   {
44     const struct dictionary *dict; /*Dictionary of destination */
45     enum data_parser_type type; /* Type of data to parse. */
46     int skip_records;           /* Records to skip before first real data. */
47
48     struct field *fields;       /* Fields to parse. */
49     size_t field_cnt;           /* Number of fields. */
50     size_t field_allocated;     /* Number of fields spaced allocated for. */
51
52     /* DP_DELIMITED parsers only. */
53     bool span;                  /* May cases span multiple records? */
54     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
55     bool warn_missing_fields;   /* Should missing fields be considered errors? */
56     struct substring quotes;    /* Characters that can quote separators. */
57     bool quote_escape;          /* Doubled quote acts as escape? */
58     struct substring soft_seps; /* Two soft separators act like just one. */
59     struct substring hard_seps; /* Two hard separators yield empty fields. */
60     struct string any_sep;      /* Concatenation of soft_seps and hard_seps. */
61
62     /* DP_FIXED parsers only. */
63     int records_per_case;       /* Number of records in each case. */
64   };
65
66 /* How to parse one variable. */
67 struct field
68   {
69     struct fmt_spec format;     /* Input format of this field. */
70     int case_idx;               /* First value in case. */
71     char *name;                 /* Var name for error messages and tables. */
72
73     /* DP_FIXED only. */
74     int record;                 /* Record number (1-based). */
75     int first_column;           /* First column in record (1-based). */
76   };
77
78 static void set_any_sep (struct data_parser *parser);
79
80 /* Creates and returns a new data parser. */
81 struct data_parser *
82 data_parser_create (const struct dictionary *dict)
83 {
84   struct data_parser *parser = xmalloc (sizeof *parser);
85
86   parser->type = DP_FIXED;
87   parser->skip_records = 0;
88
89   parser->fields = NULL;
90   parser->field_cnt = 0;
91   parser->field_allocated = 0;
92   parser->dict = dict;
93
94   parser->span = true;
95   parser->empty_line_has_field = false;
96   parser->warn_missing_fields = true;
97   ss_alloc_substring (&parser->quotes, ss_cstr ("\"'"));
98   parser->quote_escape = false;
99   ss_alloc_substring (&parser->soft_seps, ss_cstr (CC_SPACES));
100   ss_alloc_substring (&parser->hard_seps, ss_cstr (","));
101   ds_init_empty (&parser->any_sep);
102   set_any_sep (parser);
103
104   parser->records_per_case = 0;
105
106   return parser;
107 }
108
109 /* Destroys PARSER. */
110 void
111 data_parser_destroy (struct data_parser *parser)
112 {
113   if (parser != NULL)
114     {
115       size_t i;
116
117       for (i = 0; i < parser->field_cnt; i++)
118         free (parser->fields[i].name);
119       free (parser->fields);
120       ss_dealloc (&parser->quotes);
121       ss_dealloc (&parser->soft_seps);
122       ss_dealloc (&parser->hard_seps);
123       ds_destroy (&parser->any_sep);
124       free (parser);
125     }
126 }
127
128 /* Returns the type of PARSER (either DP_DELIMITED or DP_FIXED). */
129 enum data_parser_type
130 data_parser_get_type (const struct data_parser *parser)
131 {
132   return parser->type;
133 }
134
135 /* Sets the type of PARSER to TYPE (either DP_DELIMITED or
136    DP_FIXED). */
137 void
138 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
139 {
140   assert (parser->field_cnt == 0);
141   assert (type == DP_FIXED || type == DP_DELIMITED);
142   parser->type = type;
143 }
144
145 /* Configures PARSER to skip the specified number of
146    INITIAL_RECORDS_TO_SKIP before parsing any data.  By default,
147    no records are skipped. */
148 void
149 data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
150 {
151   assert (initial_records_to_skip >= 0);
152   parser->skip_records = initial_records_to_skip;
153 }
154
155 /* Returns true if PARSER is configured to allow cases to span
156    multiple records. */
157 bool
158 data_parser_get_span (const struct data_parser *parser)
159 {
160   return parser->span;
161 }
162
163 /* If MAY_CASES_SPAN_RECORDS is true, configures PARSER to allow
164    a single case to span multiple records and multiple cases to
165    occupy a single record.  If MAY_CASES_SPAN_RECORDS is false,
166    configures PARSER to require each record to contain exactly
167    one case.
168
169    This setting affects parsing of DP_DELIMITED files only. */
170 void
171 data_parser_set_span (struct data_parser *parser, bool may_cases_span_records)
172 {
173   parser->span = may_cases_span_records;
174 }
175
176 /* If EMPTY_LINE_HAS_FIELD is true, configures PARSER to parse an
177    empty line as an empty field and to treat a hard delimiter
178    followed by end-of-line as an empty field.  If
179    EMPTY_LINE_HAS_FIELD is false, PARSER will skip empty lines
180    and hard delimiters at the end of lines without emitting empty
181    fields.
182
183    This setting affects parsing of DP_DELIMITED files only. */
184 void
185 data_parser_set_empty_line_has_field (struct data_parser *parser,
186                                       bool empty_line_has_field)
187 {
188   parser->empty_line_has_field = empty_line_has_field;
189 }
190
191
192 /* If WARN_MISSING_FIELDS is true, configures PARSER to emit a warning
193    and cause an error condition when a missing field is encountered.
194    If  WARN_MISSING_FIELDS is false, PARSER will silently fill such
195    fields with the system missing value.
196
197    This setting affects parsing of DP_DELIMITED files only. */
198 void
199 data_parser_set_warn_missing_fields (struct data_parser *parser,
200                                      bool warn_missing_fields)
201 {
202   parser->warn_missing_fields = warn_missing_fields;
203 }
204
205
206 /* Sets the characters that may be used for quoting field
207    contents to QUOTES.  If QUOTES is empty, quoting will be
208    disabled.
209
210    The caller retains ownership of QUOTES.
211
212    This setting affects parsing of DP_DELIMITED files only. */
213 void
214 data_parser_set_quotes (struct data_parser *parser, struct substring quotes)
215 {
216   ss_dealloc (&parser->quotes);
217   ss_alloc_substring (&parser->quotes, quotes);
218 }
219
220 /* If ESCAPE is false (the default setting), a character used for
221    quoting cannot itself be embedded within a quoted field.  If
222    ESCAPE is true, then a quote character can be embedded within
223    a quoted field by doubling it.
224
225    This setting affects parsing of DP_DELIMITED files only, and
226    only when at least one quote character has been set (with
227    data_parser_set_quotes). */
228 void
229 data_parser_set_quote_escape (struct data_parser *parser, bool escape)
230 {
231   parser->quote_escape = escape;
232 }
233
234 /* Sets PARSER's soft delimiters to DELIMITERS.  Soft delimiters
235    separate fields, but consecutive soft delimiters do not yield
236    empty fields.  (Ordinarily, only white space characters are
237    appropriate soft delimiters.)
238
239    The caller retains ownership of DELIMITERS.
240
241    This setting affects parsing of DP_DELIMITED files only. */
242 void
243 data_parser_set_soft_delimiters (struct data_parser *parser,
244                                  struct substring delimiters)
245 {
246   ss_dealloc (&parser->soft_seps);
247   ss_alloc_substring (&parser->soft_seps, delimiters);
248   set_any_sep (parser);
249 }
250
251 /* Sets PARSER's hard delimiters to DELIMITERS.  Hard delimiters
252    separate fields.  A consecutive pair of hard delimiters yield
253    an empty field.
254
255    The caller retains ownership of DELIMITERS.
256
257    This setting affects parsing of DP_DELIMITED files only. */
258 void
259 data_parser_set_hard_delimiters (struct data_parser *parser,
260                                  struct substring delimiters)
261 {
262   ss_dealloc (&parser->hard_seps);
263   ss_alloc_substring (&parser->hard_seps, delimiters);
264   set_any_sep (parser);
265 }
266
267 /* Returns the number of records per case. */
268 int
269 data_parser_get_records (const struct data_parser *parser)
270 {
271   return parser->records_per_case;
272 }
273
274 /* Sets the number of records per case to RECORDS_PER_CASE.
275
276    This setting affects parsing of DP_FIXED files only. */
277 void
278 data_parser_set_records (struct data_parser *parser, int records_per_case)
279 {
280   assert (records_per_case >= 0);
281   assert (records_per_case >= parser->records_per_case);
282   parser->records_per_case = records_per_case;
283 }
284
285 static void
286 add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
287            const char *name, int record, int first_column)
288 {
289   struct field *field;
290
291   if (p->field_cnt == p->field_allocated)
292     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
293   field = &p->fields[p->field_cnt++];
294   field->format = *format;
295   field->case_idx = case_idx;
296   field->name = xstrdup (name);
297   field->record = record;
298   field->first_column = first_column;
299 }
300
301 /* Adds a delimited field to the field parsed by PARSER, which
302    must be configured as a DP_DELIMITED parser.  The field is
303    parsed as input format FORMAT.  Its data will be stored into case
304    index CASE_INDEX.  Errors in input data will be reported
305    against variable NAME. */
306 void
307 data_parser_add_delimited_field (struct data_parser *parser,
308                                  const struct fmt_spec *format, int case_idx,
309                                  const char *name)
310 {
311   assert (parser->type == DP_DELIMITED);
312   add_field (parser, format, case_idx, name, 0, 0);
313 }
314
315 /* Adds a fixed field to the field parsed by PARSER, which
316    must be configured as a DP_FIXED parser.  The field is
317    parsed as input format FORMAT.  Its data will be stored into case
318    index CASE_INDEX.  Errors in input data will be reported
319    against variable NAME.  The field will be drawn from the
320    FORMAT->w columns in 1-based RECORD starting at 1-based
321    column FIRST_COLUMN.
322
323    RECORD must be at least as great as that of any field already
324    added; that is, fields must be added in increasing order of
325    record number.  If RECORD is greater than the current number
326    of records per case, the number of records per case are
327    increased as needed.  */
328 void
329 data_parser_add_fixed_field (struct data_parser *parser,
330                              const struct fmt_spec *format, int case_idx,
331                              const char *name,
332                              int record, int first_column)
333 {
334   assert (parser->type == DP_FIXED);
335   assert (parser->field_cnt == 0
336           || record >= parser->fields[parser->field_cnt - 1].record);
337   if (record > parser->records_per_case)
338     parser->records_per_case = record;
339   add_field (parser, format, case_idx, name, record, first_column);
340 }
341
342 /* Returns true if any fields have been added to PARSER, false
343    otherwise. */
344 bool
345 data_parser_any_fields (const struct data_parser *parser)
346 {
347   return parser->field_cnt > 0;
348 }
349
350 static void
351 set_any_sep (struct data_parser *parser)
352 {
353   ds_assign_substring (&parser->any_sep, parser->soft_seps);
354   ds_put_substring (&parser->any_sep, parser->hard_seps);
355 }
356 \f
357 static bool parse_delimited_span (const struct data_parser *,
358                                   struct dfm_reader *, struct ccase *);
359 static bool parse_delimited_no_span (const struct data_parser *,
360                                      struct dfm_reader *, struct ccase *);
361 static bool parse_fixed (const struct data_parser *,
362                          struct dfm_reader *, struct ccase *);
363
364 /* Reads a case from DFM into C, parsing it with PARSER.  Returns
365    true if successful, false at end of file or on I/O error.
366
367    Case C must not be shared. */
368 bool
369 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
370                    struct ccase *c)
371 {
372   bool retval;
373
374   assert (!case_is_shared (c));
375   assert (data_parser_any_fields (parser));
376
377   /* Skip the requested number of records before reading the
378      first case. */
379   for (; parser->skip_records > 0; parser->skip_records--)
380     {
381       if (dfm_eof (reader))
382         return false;
383       dfm_forward_record (reader);
384     }
385
386   /* Limit cases. */
387   if (parser->type == DP_DELIMITED)
388     {
389       if (parser->span)
390         retval = parse_delimited_span (parser, reader, c);
391       else
392         retval = parse_delimited_no_span (parser, reader, c);
393     }
394   else
395     retval = parse_fixed (parser, reader, c);
396
397   return retval;
398 }
399
400 /* Extracts a delimited field from the current position in the
401    current record according to PARSER, reading data from READER.
402
403    *FIELD is set to the field content.  The caller must not or
404    destroy this constant string.
405
406    Sets *FIRST_COLUMN to the 1-based column number of the start of
407    the extracted field, and *LAST_COLUMN to the end of the extracted
408    field.
409
410    Returns true on success, false on failure. */
411 static bool
412 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
413            int *first_column, int *last_column, struct string *tmp,
414            struct substring *field)
415 {
416   size_t length_before_separators;
417   struct substring line, p;
418   bool quoted;
419
420   if (dfm_eof (reader))
421     return false;
422   if (ss_is_empty (parser->hard_seps))
423     dfm_expand_tabs (reader);
424   line = p = dfm_get_record (reader);
425
426   /* Skip leading soft separators. */
427   ss_ltrim (&p, parser->soft_seps);
428
429   /* Handle empty or completely consumed lines. */
430   if (ss_is_empty (p))
431     {
432       if (!parser->empty_line_has_field || dfm_columns_past_end (reader) > 0)
433         return false;
434       else
435         {
436           *field = p;
437           *first_column = dfm_column_start (reader);
438           *last_column = *first_column + 1;
439           dfm_forward_columns (reader, 1);
440           return true;
441         }
442     }
443
444   *first_column = dfm_column_start (reader);
445   quoted = ss_find_byte (parser->quotes, ss_first (p)) != SIZE_MAX;
446   if (quoted)
447     {
448       /* Quoted field. */
449       int quote = ss_get_byte (&p);
450       if (!ss_get_until (&p, quote, field))
451         msg (DW, _("Quoted string extends beyond end of line."));
452       if (parser->quote_escape && ss_first (p) == quote)
453         {
454           ds_assign_substring (tmp, *field);
455           while (ss_match_byte (&p, quote))
456             {
457               struct substring ss;
458               ds_put_byte (tmp, quote);
459               if (!ss_get_until (&p, quote, &ss))
460                 msg (DW, _("Quoted string extends beyond end of line."));
461               ds_put_substring (tmp, ss);
462             }
463           *field = ds_ss (tmp);
464         }
465       *last_column = *first_column + (ss_length (line) - ss_length (p));
466     }
467   else
468     {
469       /* Regular field. */
470       ss_get_bytes (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
471       *last_column = *first_column + ss_length (*field);
472     }
473
474   /* Skip trailing soft separator and a single hard separator if present. */
475   length_before_separators = ss_length (p);
476   ss_ltrim (&p, parser->soft_seps);
477   if (!ss_is_empty (p)
478       && ss_find_byte (parser->hard_seps, ss_first (p)) != SIZE_MAX)
479     {
480       ss_advance (&p, 1);
481       ss_ltrim (&p, parser->soft_seps);
482     }
483   if (ss_is_empty (p))
484     dfm_forward_columns (reader, 1);
485   else if (quoted && length_before_separators == ss_length (p))
486     msg (DW, _("Missing delimiter following quoted string."));
487   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
488
489   return true;
490 }
491
492 static void
493 parse_error (const struct dfm_reader *reader, const struct field *field,
494              int first_column, int last_column, char *error)
495 {
496   struct msg m = {
497     .category = MSG_C_DATA,
498     .severity = MSG_S_WARNING,
499     .file_name = CONST_CAST (char *, dfm_get_file_name (reader)),
500     .first_line = dfm_get_line_number (reader),
501     .last_line = m.first_line + 1,
502     .first_column = first_column,
503     .last_column = last_column,
504     .text = xasprintf (_("Data for variable %s is not valid as format %s: %s"),
505                        field->name, fmt_name (field->format.type), error),
506   };
507   msg_emit (&m);
508
509   free (error);
510 }
511
512 /* Reads a case from READER into C, parsing it according to
513    fixed-format syntax rules in PARSER.
514    Returns true if successful, false at end of file or on I/O error. */
515 static bool
516 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
517              struct ccase *c)
518 {
519   const char *input_encoding = dfm_reader_get_encoding (reader);
520   const char *output_encoding = dict_get_encoding (parser->dict);
521   struct field *f;
522   int row;
523
524   if (dfm_eof (reader))
525     return false;
526
527   f = parser->fields;
528   for (row = 1; row <= parser->records_per_case; row++)
529     {
530       struct substring line;
531
532       if (dfm_eof (reader))
533         {
534           msg (DW, _("Partial case of %d of %d records discarded."),
535                row - 1, parser->records_per_case);
536           return false;
537         }
538       dfm_expand_tabs (reader);
539       line = dfm_get_record (reader);
540
541       for (; f < &parser->fields[parser->field_cnt] && f->record == row; f++)
542         {
543           struct substring s = ss_substr (line, f->first_column - 1,
544                                           f->format.w);
545           union value *value = case_data_rw_idx (c, f->case_idx);
546           char *error = data_in (s, input_encoding, f->format.type,
547                                  value, fmt_var_width (&f->format),
548                                  output_encoding);
549
550           if (error == NULL)
551             data_in_imply_decimals (s, input_encoding, f->format.type,
552                                     f->format.d, value);
553           else
554             parse_error (reader, f, f->first_column,
555                          f->first_column + f->format.w, error);
556         }
557
558       dfm_forward_record (reader);
559     }
560
561   return true;
562 }
563
564 /* Reads a case from READER into C, parsing it according to
565    free-format syntax rules in PARSER.
566    Returns true if successful, false at end of file or on I/O error. */
567 static bool
568 parse_delimited_span (const struct data_parser *parser,
569                       struct dfm_reader *reader, struct ccase *c)
570 {
571   const char *output_encoding = dict_get_encoding (parser->dict);
572   struct string tmp = DS_EMPTY_INITIALIZER;
573   struct field *f;
574
575   for (f = parser->fields; f < &parser->fields[parser->field_cnt]; f++)
576     {
577       struct substring s;
578       int first_column, last_column;
579       char *error;
580
581       /* Cut out a field and read in a new record if necessary. */
582       while (!cut_field (parser, reader,
583                          &first_column, &last_column, &tmp, &s))
584         {
585           if (!dfm_eof (reader))
586             dfm_forward_record (reader);
587           if (dfm_eof (reader))
588             {
589               if (f > parser->fields)
590                 msg (DW, _("Partial case discarded.  The first variable "
591                            "missing was %s."), f->name);
592               ds_destroy (&tmp);
593               return false;
594             }
595         }
596
597       const char *input_encoding = dfm_reader_get_encoding (reader);
598       error = data_in (s, input_encoding, f->format.type,
599                        case_data_rw_idx (c, f->case_idx),
600                        fmt_var_width (&f->format), output_encoding);
601       if (error != NULL)
602         parse_error (reader, f, first_column, last_column, error);
603     }
604   ds_destroy (&tmp);
605   return true;
606 }
607
608 /* Reads a case from READER into C, parsing it according to
609    delimited syntax rules with one case per record in PARSER.
610    Returns true if successful, false at end of file or on I/O error. */
611 static bool
612 parse_delimited_no_span (const struct data_parser *parser,
613                          struct dfm_reader *reader, struct ccase *c)
614 {
615   const char *output_encoding = dict_get_encoding (parser->dict);
616   struct string tmp = DS_EMPTY_INITIALIZER;
617   struct substring s;
618   struct field *f, *end;
619
620   if (dfm_eof (reader))
621     return false;
622
623   end = &parser->fields[parser->field_cnt];
624   for (f = parser->fields; f < end; f++)
625     {
626       int first_column, last_column;
627       char *error;
628
629       if (!cut_field (parser, reader, &first_column, &last_column, &tmp, &s))
630         {
631           if (f < end - 1 && settings_get_undefined () && parser->warn_missing_fields)
632             msg (DW, _("Missing value(s) for all variables from %s onward.  "
633                        "These will be filled with the system-missing value "
634                        "or blanks, as appropriate."),
635                  f->name);
636           for (; f < end; f++)
637             value_set_missing (case_data_rw_idx (c, f->case_idx),
638                                fmt_var_width (&f->format));
639           goto exit;
640         }
641
642       const char *input_encoding = dfm_reader_get_encoding (reader);
643       error = data_in (s, input_encoding, f->format.type,
644                        case_data_rw_idx (c, f->case_idx),
645                        fmt_var_width (&f->format), output_encoding);
646       if (error != NULL)
647         parse_error (reader, f, first_column, last_column, error);
648     }
649
650   s = dfm_get_record (reader);
651   ss_ltrim (&s, parser->soft_seps);
652   if (!ss_is_empty (s))
653     msg (DW, _("Record ends in data not part of any field."));
654
655 exit:
656   dfm_forward_record (reader);
657   ds_destroy (&tmp);
658   return true;
659 }
660 \f
661 /* Displays a table giving information on fixed-format variable
662    parsing on DATA LIST. */
663 static void
664 dump_fixed_table (const struct data_parser *parser,
665                   const struct file_handle *fh)
666 {
667   struct tab_table *t;
668   size_t i;
669
670   t = tab_create (4, parser->field_cnt + 1);
671   tab_headers (t, 0, 0, 1, 0);
672   tab_text (t, 0, 0, TAB_CENTER | TAT_TITLE, _("Variable"));
673   tab_text (t, 1, 0, TAB_CENTER | TAT_TITLE, _("Record"));
674   tab_text (t, 2, 0, TAB_CENTER | TAT_TITLE, _("Columns"));
675   tab_text (t, 3, 0, TAB_CENTER | TAT_TITLE, _("Format"));
676   tab_box (t, TAL_1, TAL_1, TAL_0, TAL_1, 0, 0, 3, parser->field_cnt);
677   tab_hline (t, TAL_2, 0, 3, 1);
678
679   for (i = 0; i < parser->field_cnt; i++)
680     {
681       struct field *f = &parser->fields[i];
682       char fmt_string[FMT_STRING_LEN_MAX + 1];
683       int row = i + 1;
684
685       tab_text (t, 0, row, TAB_LEFT, f->name);
686       tab_text_format (t, 1, row, 0, "%d", f->record);
687       tab_text_format (t, 2, row, 0, "%3d-%3d",
688                        f->first_column, f->first_column + f->format.w - 1);
689       tab_text (t, 3, row, TAB_LEFT | TAB_FIX,
690                 fmt_to_string (&f->format, fmt_string));
691     }
692
693   tab_title (t, ngettext ("Reading %d record from %s.",
694                           "Reading %d records from %s.",
695                           parser->records_per_case),
696              parser->records_per_case, fh_get_name (fh));
697   tab_submit (t);
698 }
699
700 /* Displays a table giving information on free-format variable parsing
701    on DATA LIST. */
702 static void
703 dump_delimited_table (const struct data_parser *parser,
704                       const struct file_handle *fh)
705 {
706   struct tab_table *t;
707   size_t i;
708
709   t = tab_create (2, parser->field_cnt + 1);
710   tab_headers (t, 0, 0, 1, 0);
711   tab_text (t, 0, 0, TAB_CENTER | TAT_TITLE, _("Variable"));
712   tab_text (t, 1, 0, TAB_CENTER | TAT_TITLE, _("Format"));
713   tab_box (t, TAL_1, TAL_1, TAL_0, TAL_1, 0, 0, 1, parser->field_cnt);
714   tab_hline (t, TAL_2, 0, 1, 1);
715
716   for (i = 0; i < parser->field_cnt; i++)
717     {
718       struct field *f = &parser->fields[i];
719       char str[FMT_STRING_LEN_MAX + 1];
720       int row = i + 1;
721
722       tab_text (t, 0, row, TAB_LEFT, f->name);
723       tab_text (t, 1, row, TAB_LEFT | TAB_FIX,
724                 fmt_to_string (&f->format, str));
725     }
726
727   tab_title (t, _("Reading free-form data from %s."), fh_get_name (fh));
728
729   tab_submit (t);
730 }
731
732 /* Displays a table giving information on how PARSER will read
733    data from FH. */
734 void
735 data_parser_output_description (struct data_parser *parser,
736                                 const struct file_handle *fh)
737 {
738   if (parser->type == DP_FIXED)
739     dump_fixed_table (parser, fh);
740   else
741     dump_delimited_table (parser, fh);
742 }
743 \f
744 /* Data parser input program. */
745 struct data_parser_casereader
746   {
747     struct data_parser *parser; /* Parser. */
748     struct dfm_reader *reader;  /* Data file reader. */
749     struct caseproto *proto;    /* Format of cases. */
750   };
751
752 static const struct casereader_class data_parser_casereader_class;
753
754 /* Replaces DS's active dataset by an input program that reads data
755    from READER according to the rules in PARSER, using DICT as
756    the underlying dictionary.  Ownership of PARSER and READER is
757    transferred to the input program, and ownership of DICT is
758    transferred to the dataset. */
759 void
760 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
761                                struct dfm_reader *reader,
762                                struct dictionary *dict,
763                                struct casereader* (*func)(struct casereader *,
764                                                           const struct dictionary *,
765                                                           void *),
766                                void *ud)
767 {
768   struct data_parser_casereader *r;
769   struct casereader *casereader0;
770   struct casereader *casereader1;
771
772   r = xmalloc (sizeof *r);
773   r->parser = parser;
774   r->reader = reader;
775   r->proto = caseproto_ref (dict_get_proto (dict));
776   casereader0 = casereader_create_sequential (NULL, r->proto,
777                                              CASENUMBER_MAX,
778                                              &data_parser_casereader_class, r);
779
780   if (func)
781     casereader1 = func (casereader0, dict, ud);
782   else
783     casereader1 = casereader0;
784
785   dataset_set_dict (ds, dict);
786   dataset_set_source (ds, casereader1);
787 }
788
789
790 static struct ccase *
791 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
792 {
793   struct data_parser_casereader *r = r_;
794   struct ccase *c = case_create (r->proto);
795   if (data_parser_parse (r->parser, r->reader, c))
796     return c;
797   else
798     {
799       case_unref (c);
800       return NULL;
801     }
802 }
803
804 static void
805 data_parser_casereader_destroy (struct casereader *reader UNUSED, void *r_)
806 {
807   struct data_parser_casereader *r = r_;
808   if (dfm_reader_error (r->reader))
809     casereader_force_error (reader);
810   data_parser_destroy (r->parser);
811   dfm_close_reader (r->reader);
812   caseproto_unref (r->proto);
813   free (r);
814 }
815
816 static const struct casereader_class data_parser_casereader_class =
817   {
818     data_parser_casereader_read,
819     data_parser_casereader_destroy,
820     NULL,
821     NULL,
822   };