DATA LIST FREE: Warn when a quoted string is not followed by a delimiter.
[pspp] / src / language / data-io / data-parser.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/data-io/data-parser.h"
20
21 #include <stdint.h>
22 #include <stdlib.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/data-in.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/file-handle-def.h"
30 #include "data/settings.h"
31 #include "language/data-io/data-reader.h"
32 #include "libpspp/message.h"
33 #include "libpspp/str.h"
34 #include "output/tab.h"
35
36 #include "gl/xalloc.h"
37
38 #include "gettext.h"
39 #define _(msgid) gettext (msgid)
40
41 /* Data parser for textual data like that read by DATA LIST. */
42 struct data_parser
43   {
44     const struct dictionary *dict; /*Dictionary of destination */
45     enum data_parser_type type; /* Type of data to parse. */
46     int skip_records;           /* Records to skip before first real data. */
47     casenumber max_cases;       /* Max number of cases to read. */
48     int percent_cases;          /* Approximate percent of cases to read. */
49
50     struct field *fields;       /* Fields to parse. */
51     size_t field_cnt;           /* Number of fields. */
52     size_t field_allocated;     /* Number of fields spaced allocated for. */
53
54     /* DP_DELIMITED parsers only. */
55     bool span;                  /* May cases span multiple records? */
56     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
57     struct substring quotes;    /* Characters that can quote separators. */
58     bool quote_escape;          /* Doubled quote acts as escape? */
59     struct substring soft_seps; /* Two soft separators act like just one. */
60     struct substring hard_seps; /* Two hard separators yield empty fields. */
61     struct string any_sep;      /* Concatenation of soft_seps and hard_seps. */
62
63     /* DP_FIXED parsers only. */
64     int records_per_case;       /* Number of records in each case. */
65   };
66
67 /* How to parse one variable. */
68 struct field
69   {
70     struct fmt_spec format;     /* Input format of this field. */
71     int case_idx;               /* First value in case. */
72     char *name;                 /* Var name for error messages and tables. */
73
74     /* DP_FIXED only. */
75     int record;                 /* Record number (1-based). */
76     int first_column;           /* First column in record (1-based). */
77   };
78
79 static void set_any_sep (struct data_parser *parser);
80
81 /* Creates and returns a new data parser. */
82 struct data_parser *
83 data_parser_create (const struct dictionary *dict)
84 {
85   struct data_parser *parser = xmalloc (sizeof *parser);
86
87   parser->type = DP_FIXED;
88   parser->skip_records = 0;
89   parser->max_cases = -1;
90   parser->percent_cases = 100;
91
92   parser->fields = NULL;
93   parser->field_cnt = 0;
94   parser->field_allocated = 0;
95   parser->dict = dict;
96
97   parser->span = true;
98   parser->empty_line_has_field = false;
99   ss_alloc_substring (&parser->quotes, ss_cstr ("\"'"));
100   parser->quote_escape = false;
101   ss_alloc_substring (&parser->soft_seps, ss_cstr (CC_SPACES));
102   ss_alloc_substring (&parser->hard_seps, ss_cstr (","));
103   ds_init_empty (&parser->any_sep);
104   set_any_sep (parser);
105
106   parser->records_per_case = 0;
107
108   return parser;
109 }
110
111 /* Destroys PARSER. */
112 void
113 data_parser_destroy (struct data_parser *parser)
114 {
115   if (parser != NULL)
116     {
117       size_t i;
118
119       for (i = 0; i < parser->field_cnt; i++)
120         free (parser->fields[i].name);
121       free (parser->fields);
122       ss_dealloc (&parser->quotes);
123       ss_dealloc (&parser->soft_seps);
124       ss_dealloc (&parser->hard_seps);
125       ds_destroy (&parser->any_sep);
126       free (parser);
127     }
128 }
129
130 /* Returns the type of PARSER (either DP_DELIMITED or DP_FIXED). */
131 enum data_parser_type
132 data_parser_get_type (const struct data_parser *parser)
133 {
134   return parser->type;
135 }
136
137 /* Sets the type of PARSER to TYPE (either DP_DELIMITED or
138    DP_FIXED). */
139 void
140 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
141 {
142   assert (parser->field_cnt == 0);
143   assert (type == DP_FIXED || type == DP_DELIMITED);
144   parser->type = type;
145 }
146
147 /* Configures PARSER to skip the specified number of
148    INITIAL_RECORDS_TO_SKIP before parsing any data.  By default,
149    no records are skipped. */
150 void
151 data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
152 {
153   assert (initial_records_to_skip >= 0);
154   parser->skip_records = initial_records_to_skip;
155 }
156
157 /* Sets the maximum number of cases parsed by PARSER to
158    MAX_CASES.  The default is -1, meaning no limit. */
159 void
160 data_parser_set_case_limit (struct data_parser *parser, casenumber max_cases)
161 {
162   parser->max_cases = max_cases;
163 }
164
165 /* Sets the percentage of cases that PARSER should read from the
166    input file to PERCENT_CASES.  By default, all cases are
167    read. */
168 void
169 data_parser_set_case_percent (struct data_parser *parser, int percent_cases)
170 {
171   assert (percent_cases >= 0 && percent_cases <= 100);
172   parser->percent_cases = percent_cases;
173 }
174
175 /* Returns true if PARSER is configured to allow cases to span
176    multiple records. */
177 bool
178 data_parser_get_span (const struct data_parser *parser)
179 {
180   return parser->span;
181 }
182
183 /* If MAY_CASES_SPAN_RECORDS is true, configures PARSER to allow
184    a single case to span multiple records and multiple cases to
185    occupy a single record.  If MAY_CASES_SPAN_RECORDS is false,
186    configures PARSER to require each record to contain exactly
187    one case.
188
189    This setting affects parsing of DP_DELIMITED files only. */
190 void
191 data_parser_set_span (struct data_parser *parser, bool may_cases_span_records)
192 {
193   parser->span = may_cases_span_records;
194 }
195
196 /* If EMPTY_LINE_HAS_FIELD is true, configures PARSER to parse an
197    empty line as an empty field and to treat a hard delimiter
198    followed by end-of-line as an empty field.  If
199    EMPTY_LINE_HAS_FIELD is false, PARSER will skip empty lines
200    and hard delimiters at the end of lines without emitting empty
201    fields.
202
203    This setting affects parsing of DP_DELIMITED files only. */
204 void
205 data_parser_set_empty_line_has_field (struct data_parser *parser,
206                                       bool empty_line_has_field)
207 {
208   parser->empty_line_has_field = empty_line_has_field;
209 }
210
211 /* Sets the characters that may be used for quoting field
212    contents to QUOTES.  If QUOTES is empty, quoting will be
213    disabled.
214
215    The caller retains ownership of QUOTES.
216
217    This setting affects parsing of DP_DELIMITED files only. */
218 void
219 data_parser_set_quotes (struct data_parser *parser, struct substring quotes)
220 {
221   ss_dealloc (&parser->quotes);
222   ss_alloc_substring (&parser->quotes, quotes);
223 }
224
225 /* If ESCAPE is false (the default setting), a character used for
226    quoting cannot itself be embedded within a quoted field.  If
227    ESCAPE is true, then a quote character can be embedded within
228    a quoted field by doubling it.
229
230    This setting affects parsing of DP_DELIMITED files only, and
231    only when at least one quote character has been set (with
232    data_parser_set_quotes). */
233 void
234 data_parser_set_quote_escape (struct data_parser *parser, bool escape)
235 {
236   parser->quote_escape = escape;
237 }
238
239 /* Sets PARSER's soft delimiters to DELIMITERS.  Soft delimiters
240    separate fields, but consecutive soft delimiters do not yield
241    empty fields.  (Ordinarily, only white space characters are
242    appropriate soft delimiters.)
243
244    The caller retains ownership of DELIMITERS.
245
246    This setting affects parsing of DP_DELIMITED files only. */
247 void
248 data_parser_set_soft_delimiters (struct data_parser *parser,
249                                  struct substring delimiters)
250 {
251   ss_dealloc (&parser->soft_seps);
252   ss_alloc_substring (&parser->soft_seps, delimiters);
253   set_any_sep (parser);
254 }
255
256 /* Sets PARSER's hard delimiters to DELIMITERS.  Hard delimiters
257    separate fields.  A consecutive pair of hard delimiters yield
258    an empty field.
259
260    The caller retains ownership of DELIMITERS.
261
262    This setting affects parsing of DP_DELIMITED files only. */
263 void
264 data_parser_set_hard_delimiters (struct data_parser *parser,
265                                  struct substring delimiters)
266 {
267   ss_dealloc (&parser->hard_seps);
268   ss_alloc_substring (&parser->hard_seps, delimiters);
269   set_any_sep (parser);
270 }
271
272 /* Returns the number of records per case. */
273 int
274 data_parser_get_records (const struct data_parser *parser)
275 {
276   return parser->records_per_case;
277 }
278
279 /* Sets the number of records per case to RECORDS_PER_CASE.
280
281    This setting affects parsing of DP_FIXED files only. */
282 void
283 data_parser_set_records (struct data_parser *parser, int records_per_case)
284 {
285   assert (records_per_case >= 0);
286   assert (records_per_case >= parser->records_per_case);
287   parser->records_per_case = records_per_case;
288 }
289
290 static void
291 add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
292            const char *name, int record, int first_column)
293 {
294   struct field *field;
295
296   if (p->field_cnt == p->field_allocated)
297     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
298   field = &p->fields[p->field_cnt++];
299   field->format = *format;
300   field->case_idx = case_idx;
301   field->name = xstrdup (name);
302   field->record = record;
303   field->first_column = first_column;
304 }
305
306 /* Adds a delimited field to the field parsed by PARSER, which
307    must be configured as a DP_DELIMITED parser.  The field is
308    parsed as input format FORMAT.  Its data will be stored into case
309    index CASE_INDEX.  Errors in input data will be reported
310    against variable NAME. */
311 void
312 data_parser_add_delimited_field (struct data_parser *parser,
313                                  const struct fmt_spec *format, int case_idx,
314                                  const char *name)
315 {
316   assert (parser->type == DP_DELIMITED);
317   add_field (parser, format, case_idx, name, 0, 0);
318 }
319
320 /* Adds a fixed field to the field parsed by PARSER, which
321    must be configured as a DP_FIXED parser.  The field is
322    parsed as input format FORMAT.  Its data will be stored into case
323    index CASE_INDEX.  Errors in input data will be reported
324    against variable NAME.  The field will be drawn from the
325    FORMAT->w columns in 1-based RECORD starting at 1-based
326    column FIRST_COLUMN.
327
328    RECORD must be at least as great as that of any field already
329    added; that is, fields must be added in increasing order of
330    record number.  If RECORD is greater than the current number
331    of records per case, the number of records per case are
332    increased as needed.  */
333 void
334 data_parser_add_fixed_field (struct data_parser *parser,
335                              const struct fmt_spec *format, int case_idx,
336                              const char *name,
337                              int record, int first_column)
338 {
339   assert (parser->type == DP_FIXED);
340   assert (parser->field_cnt == 0
341           || record >= parser->fields[parser->field_cnt - 1].record);
342   if (record > parser->records_per_case)
343     parser->records_per_case = record;
344   add_field (parser, format, case_idx, name, record, first_column);
345 }
346
347 /* Returns true if any fields have been added to PARSER, false
348    otherwise. */
349 bool
350 data_parser_any_fields (const struct data_parser *parser)
351 {
352   return parser->field_cnt > 0;
353 }
354
355 static void
356 set_any_sep (struct data_parser *parser)
357 {
358   ds_assign_substring (&parser->any_sep, parser->soft_seps);
359   ds_put_substring (&parser->any_sep, parser->hard_seps);
360 }
361 \f
362 static bool parse_delimited_span (const struct data_parser *,
363                                   struct dfm_reader *, struct ccase *);
364 static bool parse_delimited_no_span (const struct data_parser *,
365                                      struct dfm_reader *, struct ccase *);
366 static bool parse_fixed (const struct data_parser *,
367                          struct dfm_reader *, struct ccase *);
368
369 /* Reads a case from DFM into C, parsing it with PARSER.  Returns
370    true if successful, false at end of file or on I/O error.
371
372    Case C must not be shared. */
373 bool
374 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
375                    struct ccase *c)
376 {
377   bool retval;
378
379   assert (!case_is_shared (c));
380   assert (data_parser_any_fields (parser));
381
382   /* Skip the requested number of records before reading the
383      first case. */
384   for (; parser->skip_records > 0; parser->skip_records--)
385     {
386       if (dfm_eof (reader))
387         return false;
388       dfm_forward_record (reader);
389     }
390
391   /* Limit cases. */
392   if (parser->max_cases != -1 && parser->max_cases-- == 0)
393     return false;
394   if (parser->percent_cases < 100
395       && dfm_get_percent_read (reader) >= parser->percent_cases)
396     return false;
397
398   if (parser->type == DP_DELIMITED)
399     {
400       if (parser->span)
401         retval = parse_delimited_span (parser, reader, c);
402       else
403         retval = parse_delimited_no_span (parser, reader, c);
404     }
405   else
406     retval = parse_fixed (parser, reader, c);
407
408   return retval;
409 }
410
411 /* Extracts a delimited field from the current position in the
412    current record according to PARSER, reading data from READER.
413
414    *FIELD is set to the field content.  The caller must not or
415    destroy this constant string.
416
417    After parsing the field, sets the current position in the
418    record to just past the field and any trailing delimiter.
419    Returns 0 on failure or a 1-based column number indicating the
420    beginning of the field on success. */
421 static bool
422 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
423            int *first_column, int *last_column, struct string *tmp,
424            struct substring *field)
425 {
426   struct substring line, p;
427
428   if (dfm_eof (reader))
429     return false;
430   if (ss_is_empty (parser->hard_seps))
431     dfm_expand_tabs (reader);
432   line = p = dfm_get_record (reader);
433
434   /* Skip leading soft separators. */
435   ss_ltrim (&p, parser->soft_seps);
436
437   /* Handle empty or completely consumed lines. */
438   if (ss_is_empty (p))
439     {
440       if (!parser->empty_line_has_field || dfm_columns_past_end (reader) > 0)
441         return false;
442       else
443         {
444           *field = p;
445           *first_column = dfm_column_start (reader);
446           *last_column = *first_column + 1;
447           dfm_forward_columns (reader, 1);
448           return true;
449         }
450     }
451
452   *first_column = dfm_column_start (reader);
453   if (ss_find_byte (parser->quotes, ss_first (p)) != SIZE_MAX)
454     {
455       /* Quoted field. */
456       int quote = ss_get_byte (&p);
457       if (!ss_get_until (&p, quote, field))
458         msg (SW, _("Quoted string extends beyond end of line."));
459       if (parser->quote_escape && ss_first (p) == quote)
460         {
461           ds_assign_substring (tmp, *field);
462           while (ss_match_byte (&p, quote))
463             {
464               struct substring ss;
465               ds_put_byte (tmp, quote);
466               if (!ss_get_until (&p, quote, &ss))
467                 msg (SW, _("Quoted string extends beyond end of line."));
468               ds_put_substring (tmp, ss);
469             }
470           *field = ds_ss (tmp);
471         }
472       *last_column = *first_column + (ss_length (line) - ss_length (p));
473
474       /* Skip trailing soft separator and a single hard separator
475          if present. */
476       if (!ss_is_empty (p))
477         {
478           size_t n_seps = ss_ltrim (&p, parser->soft_seps);
479           if (!ss_is_empty (p)
480               && ss_find_byte (parser->hard_seps, ss_first (p)) != SIZE_MAX)
481             {
482               ss_advance (&p, 1);
483               n_seps++;
484             }
485           if (!n_seps)
486             msg (SW, _("Missing delimiter following quoted string."));
487         }
488     }
489   else
490     {
491       /* Regular field. */
492       ss_get_bytes (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
493       *last_column = *first_column + ss_length (*field);
494
495       if (!ss_ltrim (&p, parser->soft_seps) || ss_is_empty (p)
496           || ss_find_byte (parser->hard_seps, p.string[0]) != SIZE_MAX)
497         {
498           /* Advance past a trailing hard separator,
499              regardless of whether one actually existed.  If
500              we "skip" a delimiter that was not actually
501              there, then we will return end-of-line on our
502              next call, which is what we want. */
503           dfm_forward_columns (reader, 1);
504         }
505     }
506   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
507
508   return true;
509 }
510
511 static void
512 parse_error (const struct dfm_reader *reader, const struct field *field,
513              int first_column, int last_column, char *error)
514 {
515   struct msg m;
516
517   m.category = MSG_C_DATA;
518   m.severity = MSG_S_WARNING;
519   m.file_name = CONST_CAST (char *, dfm_get_file_name (reader));
520   m.first_line = dfm_get_line_number (reader);
521   m.last_line = m.first_line + 1;
522   m.first_column = first_column;
523   m.last_column = last_column;
524   m.text = xasprintf (_("Data for variable %s is not valid as format %s: %s"),
525                       field->name, fmt_name (field->format.type), error);
526   msg_emit (&m);
527
528   free (error);
529 }
530
531 /* Reads a case from READER into C, parsing it according to
532    fixed-format syntax rules in PARSER.
533    Returns true if successful, false at end of file or on I/O error. */
534 static bool
535 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
536              struct ccase *c)
537 {
538   const char *input_encoding = dfm_reader_get_encoding (reader);
539   const char *output_encoding = dict_get_encoding (parser->dict);
540   struct field *f;
541   int row;
542
543   if (dfm_eof (reader))
544     return false;
545
546   f = parser->fields;
547   for (row = 1; row <= parser->records_per_case; row++)
548     {
549       struct substring line;
550
551       if (dfm_eof (reader))
552         {
553           msg (SW, _("Partial case of %d of %d records discarded."),
554                row - 1, parser->records_per_case);
555           return false;
556         }
557       dfm_expand_tabs (reader);
558       line = dfm_get_record (reader);
559
560       for (; f < &parser->fields[parser->field_cnt] && f->record == row; f++)
561         {
562           struct substring s = ss_substr (line, f->first_column - 1,
563                                           f->format.w);
564           union value *value = case_data_rw_idx (c, f->case_idx);
565           char *error = data_in (s, input_encoding, f->format.type,
566                                  value, fmt_var_width (&f->format),
567                                  output_encoding);
568
569           if (error == NULL)
570             data_in_imply_decimals (s, input_encoding, f->format.type,
571                                     f->format.d, value);
572           else
573             parse_error (reader, f, f->first_column,
574                          f->first_column + f->format.w, error);
575         }
576
577       dfm_forward_record (reader);
578     }
579
580   return true;
581 }
582
583 /* Reads a case from READER into C, parsing it according to
584    free-format syntax rules in PARSER.
585    Returns true if successful, false at end of file or on I/O error. */
586 static bool
587 parse_delimited_span (const struct data_parser *parser,
588                       struct dfm_reader *reader, struct ccase *c)
589 {
590   const char *input_encoding = dfm_reader_get_encoding (reader);
591   const char *output_encoding = dict_get_encoding (parser->dict);
592   struct string tmp = DS_EMPTY_INITIALIZER;
593   struct field *f;
594
595   for (f = parser->fields; f < &parser->fields[parser->field_cnt]; f++)
596     {
597       struct substring s;
598       int first_column, last_column;
599       char *error;
600
601       /* Cut out a field and read in a new record if necessary. */
602       while (!cut_field (parser, reader,
603                          &first_column, &last_column, &tmp, &s))
604         {
605           if (!dfm_eof (reader))
606             dfm_forward_record (reader);
607           if (dfm_eof (reader))
608             {
609               if (f > parser->fields)
610                 msg (SW, _("Partial case discarded.  The first variable "
611                            "missing was %s."), f->name);
612               ds_destroy (&tmp);
613               return false;
614             }
615         }
616
617       error = data_in (s, input_encoding, f->format.type,
618                        case_data_rw_idx (c, f->case_idx),
619                        fmt_var_width (&f->format), output_encoding);
620       if (error != NULL)
621         parse_error (reader, f, first_column, last_column, error);
622     }
623   ds_destroy (&tmp);
624   return true;
625 }
626
627 /* Reads a case from READER into C, parsing it according to
628    delimited syntax rules with one case per record in PARSER.
629    Returns true if successful, false at end of file or on I/O error. */
630 static bool
631 parse_delimited_no_span (const struct data_parser *parser,
632                          struct dfm_reader *reader, struct ccase *c)
633 {
634   const char *input_encoding = dfm_reader_get_encoding (reader);
635   const char *output_encoding = dict_get_encoding (parser->dict);
636   struct string tmp = DS_EMPTY_INITIALIZER;
637   struct substring s;
638   struct field *f, *end;
639
640   if (dfm_eof (reader))
641     return false;
642
643   end = &parser->fields[parser->field_cnt];
644   for (f = parser->fields; f < end; f++)
645     {
646       int first_column, last_column;
647       char *error;
648
649       if (!cut_field (parser, reader, &first_column, &last_column, &tmp, &s))
650         {
651           if (f < end - 1 && settings_get_undefined ())
652             msg (SW, _("Missing value(s) for all variables from %s onward.  "
653                        "These will be filled with the system-missing value "
654                        "or blanks, as appropriate."),
655                  f->name);
656           for (; f < end; f++)
657             value_set_missing (case_data_rw_idx (c, f->case_idx),
658                                fmt_var_width (&f->format));
659           goto exit;
660         }
661
662       error = data_in (s, input_encoding, f->format.type,
663                        case_data_rw_idx (c, f->case_idx),
664                        fmt_var_width (&f->format), output_encoding);
665       if (error != NULL)
666         parse_error (reader, f, first_column, last_column, error);
667     }
668
669   s = dfm_get_record (reader);
670   ss_ltrim (&s, parser->soft_seps);
671   if (!ss_is_empty (s))
672     msg (SW, _("Record ends in data not part of any field."));
673
674 exit:
675   dfm_forward_record (reader);
676   ds_destroy (&tmp);
677   return true;
678 }
679 \f
680 /* Displays a table giving information on fixed-format variable
681    parsing on DATA LIST. */
682 static void
683 dump_fixed_table (const struct data_parser *parser,
684                   const struct file_handle *fh)
685 {
686   struct tab_table *t;
687   size_t i;
688
689   t = tab_create (4, parser->field_cnt + 1);
690   tab_headers (t, 0, 0, 1, 0);
691   tab_text (t, 0, 0, TAB_CENTER | TAT_TITLE, _("Variable"));
692   tab_text (t, 1, 0, TAB_CENTER | TAT_TITLE, _("Record"));
693   tab_text (t, 2, 0, TAB_CENTER | TAT_TITLE, _("Columns"));
694   tab_text (t, 3, 0, TAB_CENTER | TAT_TITLE, _("Format"));
695   tab_box (t, TAL_1, TAL_1, TAL_0, TAL_1, 0, 0, 3, parser->field_cnt);
696   tab_hline (t, TAL_2, 0, 3, 1);
697
698   for (i = 0; i < parser->field_cnt; i++)
699     {
700       struct field *f = &parser->fields[i];
701       char fmt_string[FMT_STRING_LEN_MAX + 1];
702       int row = i + 1;
703
704       tab_text (t, 0, row, TAB_LEFT, f->name);
705       tab_text_format (t, 1, row, 0, "%d", f->record);
706       tab_text_format (t, 2, row, 0, "%3d-%3d",
707                        f->first_column, f->first_column + f->format.w - 1);
708       tab_text (t, 3, row, TAB_LEFT | TAB_FIX,
709                 fmt_to_string (&f->format, fmt_string));
710     }
711
712   tab_title (t, ngettext ("Reading %d record from %s.",
713                           "Reading %d records from %s.",
714                           parser->records_per_case),
715              parser->records_per_case, fh_get_name (fh));
716   tab_submit (t);
717 }
718
719 /* Displays a table giving information on free-format variable parsing
720    on DATA LIST. */
721 static void
722 dump_delimited_table (const struct data_parser *parser,
723                       const struct file_handle *fh)
724 {
725   struct tab_table *t;
726   size_t i;
727
728   t = tab_create (2, parser->field_cnt + 1);
729   tab_headers (t, 0, 0, 1, 0);
730   tab_text (t, 0, 0, TAB_CENTER | TAT_TITLE, _("Variable"));
731   tab_text (t, 1, 0, TAB_CENTER | TAT_TITLE, _("Format"));
732   tab_box (t, TAL_1, TAL_1, TAL_0, TAL_1, 0, 0, 1, parser->field_cnt);
733   tab_hline (t, TAL_2, 0, 1, 1);
734
735   for (i = 0; i < parser->field_cnt; i++)
736     {
737       struct field *f = &parser->fields[i];
738       char str[FMT_STRING_LEN_MAX + 1];
739       int row = i + 1;
740
741       tab_text (t, 0, row, TAB_LEFT, f->name);
742       tab_text (t, 1, row, TAB_LEFT | TAB_FIX,
743                 fmt_to_string (&f->format, str));
744     }
745
746   tab_title (t, _("Reading free-form data from %s."), fh_get_name (fh));
747
748   tab_submit (t);
749 }
750
751 /* Displays a table giving information on how PARSER will read
752    data from FH. */
753 void
754 data_parser_output_description (struct data_parser *parser,
755                                 const struct file_handle *fh)
756 {
757   if (parser->type == DP_FIXED)
758     dump_fixed_table (parser, fh);
759   else
760     dump_delimited_table (parser, fh);
761 }
762 \f
763 /* Data parser input program. */
764 struct data_parser_casereader
765   {
766     struct data_parser *parser; /* Parser. */
767     struct dfm_reader *reader;  /* Data file reader. */
768     struct caseproto *proto;    /* Format of cases. */
769   };
770
771 static const struct casereader_class data_parser_casereader_class;
772
773 /* Replaces DS's active dataset by an input program that reads data
774    from READER according to the rules in PARSER, using DICT as
775    the underlying dictionary.  Ownership of PARSER and READER is
776    transferred to the input program, and ownership of DICT is
777    transferred to the dataset. */
778 void
779 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
780                               struct dfm_reader *reader,
781                               struct dictionary *dict)
782 {
783   struct data_parser_casereader *r;
784   struct casereader *casereader;
785
786   r = xmalloc (sizeof *r);
787   r->parser = parser;
788   r->reader = reader;
789   r->proto = caseproto_ref (dict_get_proto (dict));
790   casereader = casereader_create_sequential (NULL, r->proto,
791                                              CASENUMBER_MAX,
792                                              &data_parser_casereader_class, r);
793   dataset_set_dict (ds, dict);
794   dataset_set_source (ds, casereader);
795 }
796
797 static struct ccase *
798 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
799 {
800   struct data_parser_casereader *r = r_;
801   struct ccase *c = case_create (r->proto);
802   if (data_parser_parse (r->parser, r->reader, c))
803     return c;
804   else
805     {
806       case_unref (c);
807       return NULL;
808     }
809 }
810
811 static void
812 data_parser_casereader_destroy (struct casereader *reader UNUSED, void *r_)
813 {
814   struct data_parser_casereader *r = r_;
815   if (dfm_reader_error (r->reader))
816     casereader_force_error (reader);
817   data_parser_destroy (r->parser);
818   dfm_close_reader (r->reader);
819   caseproto_unref (r->proto);
820   free (r);
821 }
822
823 static const struct casereader_class data_parser_casereader_class =
824   {
825     data_parser_casereader_read,
826     data_parser_casereader_destroy,
827     NULL,
828     NULL,
829   };