data-parser: Make data parser not require a dictionary.
[pspp] / src / language / data-io / data-parser.c
index 258a42a00153a99ff19313fce9186663adc27bf8..e7896f6bbef1935bfa733bfbaed937035bc91f62 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
+   Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013, 2016 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
 
 #include <config.h>
 
-#include <language/data-io/data-parser.h>
+#include "language/data-io/data-parser.h"
 
 #include <stdint.h>
 #include <stdlib.h>
 
-#include <data/casereader-provider.h>
-#include <data/data-in.h>
-#include <data/dictionary.h>
-#include <data/format.h>
-#include <data/file-handle-def.h>
-#include <data/procedure.h>
-#include <data/settings.h>
-#include <language/data-io/data-reader.h>
-#include <libpspp/message.h>
-#include <libpspp/str.h>
-#include <output/table.h>
-
-#include "xalloc.h"
+#include "data/casereader-provider.h"
+#include "data/data-in.h"
+#include "data/dataset.h"
+#include "data/dictionary.h"
+#include "data/format.h"
+#include "data/file-handle-def.h"
+#include "data/settings.h"
+#include "language/data-io/data-reader.h"
+#include "libpspp/intern.h"
+#include "libpspp/message.h"
+#include "libpspp/str.h"
+#include "output/pivot-table.h"
+
+#include "gl/xalloc.h"
 
 #include "gettext.h"
+#define N_(msgid) msgid
 #define _(msgid) gettext (msgid)
 
 /* Data parser for textual data like that read by DATA LIST. */
@@ -43,16 +45,15 @@ struct data_parser
   {
     enum data_parser_type type; /* Type of data to parse. */
     int skip_records;           /* Records to skip before first real data. */
-    casenumber max_cases;       /* Max number of cases to read. */
-    int percent_cases;          /* Approximate percent of cases to read. */
 
     struct field *fields;       /* Fields to parse. */
-    size_t field_cnt;           /* Number of fields. */
+    size_t n_fields;            /* Number of fields. */
     size_t field_allocated;     /* Number of fields spaced allocated for. */
 
     /* DP_DELIMITED parsers only. */
     bool span;                  /* May cases span multiple records? */
     bool empty_line_has_field;  /* Does an empty line have an (empty) field? */
+    bool warn_missing_fields;   /* Should missing fields be considered errors? */
     struct substring quotes;    /* Characters that can quote separators. */
     bool quote_escape;          /* Doubled quote acts as escape? */
     struct substring soft_seps; /* Two soft separators act like just one. */
@@ -85,15 +86,14 @@ data_parser_create (void)
 
   parser->type = DP_FIXED;
   parser->skip_records = 0;
-  parser->max_cases = -1;
-  parser->percent_cases = 100;
 
   parser->fields = NULL;
-  parser->field_cnt = 0;
+  parser->n_fields = 0;
   parser->field_allocated = 0;
 
   parser->span = true;
   parser->empty_line_has_field = false;
+  parser->warn_missing_fields = true;
   ss_alloc_substring (&parser->quotes, ss_cstr ("\"'"));
   parser->quote_escape = false;
   ss_alloc_substring (&parser->soft_seps, ss_cstr (CC_SPACES));
@@ -114,7 +114,7 @@ data_parser_destroy (struct data_parser *parser)
     {
       size_t i;
 
-      for (i = 0; i < parser->field_cnt; i++)
+      for (i = 0; i < parser->n_fields; i++)
         free (parser->fields[i].name);
       free (parser->fields);
       ss_dealloc (&parser->quotes);
@@ -137,7 +137,7 @@ data_parser_get_type (const struct data_parser *parser)
 void
 data_parser_set_type (struct data_parser *parser, enum data_parser_type type)
 {
-  assert (parser->field_cnt == 0);
+  assert (parser->n_fields == 0);
   assert (type == DP_FIXED || type == DP_DELIMITED);
   parser->type = type;
 }
@@ -152,24 +152,6 @@ data_parser_set_skip (struct data_parser *parser, int initial_records_to_skip)
   parser->skip_records = initial_records_to_skip;
 }
 
-/* Sets the maximum number of cases parsed by PARSER to
-   MAX_CASES.  The default is -1, meaning no limit. */
-void
-data_parser_set_case_limit (struct data_parser *parser, casenumber max_cases)
-{
-  parser->max_cases = max_cases;
-}
-
-/* Sets the percentage of cases that PARSER should read from the
-   input file to PERCENT_CASES.  By default, all cases are
-   read. */
-void
-data_parser_set_case_percent (struct data_parser *parser, int percent_cases)
-{
-  assert (percent_cases >= 0 && percent_cases <= 100);
-  parser->percent_cases = percent_cases;
-}
-
 /* Returns true if PARSER is configured to allow cases to span
    multiple records. */
 bool
@@ -206,6 +188,21 @@ data_parser_set_empty_line_has_field (struct data_parser *parser,
   parser->empty_line_has_field = empty_line_has_field;
 }
 
+
+/* If WARN_MISSING_FIELDS is true, configures PARSER to emit a warning
+   and cause an error condition when a missing field is encountered.
+   If  WARN_MISSING_FIELDS is false, PARSER will silently fill such
+   fields with the system missing value.
+
+   This setting affects parsing of DP_DELIMITED files only. */
+void
+data_parser_set_warn_missing_fields (struct data_parser *parser,
+                                    bool warn_missing_fields)
+{
+  parser->warn_missing_fields = warn_missing_fields;
+}
+
+
 /* Sets the characters that may be used for quoting field
    contents to QUOTES.  If QUOTES is empty, quoting will be
    disabled.
@@ -291,9 +288,9 @@ add_field (struct data_parser *p, const struct fmt_spec *format, int case_idx,
 {
   struct field *field;
 
-  if (p->field_cnt == p->field_allocated)
+  if (p->n_fields == p->field_allocated)
     p->fields = x2nrealloc (p->fields, &p->field_allocated, sizeof *p->fields);
-  field = &p->fields[p->field_cnt++];
+  field = &p->fields[p->n_fields++];
   field->format = *format;
   field->case_idx = case_idx;
   field->name = xstrdup (name);
@@ -335,8 +332,8 @@ data_parser_add_fixed_field (struct data_parser *parser,
                              int record, int first_column)
 {
   assert (parser->type == DP_FIXED);
-  assert (parser->field_cnt == 0
-          || record >= parser->fields[parser->field_cnt - 1].record);
+  assert (parser->n_fields == 0
+          || record >= parser->fields[parser->n_fields - 1].record);
   if (record > parser->records_per_case)
     parser->records_per_case = record;
   add_field (parser, format, case_idx, name, record, first_column);
@@ -347,7 +344,7 @@ data_parser_add_fixed_field (struct data_parser *parser,
 bool
 data_parser_any_fields (const struct data_parser *parser)
 {
-  return parser->field_cnt > 0;
+  return parser->n_fields > 0;
 }
 
 static void
@@ -358,19 +355,21 @@ set_any_sep (struct data_parser *parser)
 }
 \f
 static bool parse_delimited_span (const struct data_parser *,
-                                  struct dfm_reader *, struct ccase *);
+                                  struct dfm_reader *,
+                                  struct dictionary *, struct ccase *);
 static bool parse_delimited_no_span (const struct data_parser *,
-                                     struct dfm_reader *, struct ccase *);
-static bool parse_fixed (const struct data_parser *,
-                         struct dfm_reader *, struct ccase *);
+                                     struct dfm_reader *,
+                                     struct dictionary *, struct ccase *);
+static bool parse_fixed (const struct data_parser *, struct dfm_reader *,
+                         struct dictionary *, struct ccase *);
 
-/* Reads a case from DFM into C, parsing it with PARSER.  Returns
-   true if successful, false at end of file or on I/O error.
+/* Reads a case from DFM into C, which matches dictionary DICT, parsing it with
+   PARSER.  Returns true if successful, false at end of file or on I/O error.
 
    Case C must not be shared. */
 bool
 data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
-                   struct ccase *c)
+                   struct dictionary *dict, struct ccase *c)
 {
   bool retval;
 
@@ -387,23 +386,15 @@ data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
     }
 
   /* Limit cases. */
-  if (parser->max_cases != -1 && parser->max_cases-- == 0)
-    return false;
-  if (parser->percent_cases < 100
-      && dfm_get_percent_read (reader) >= parser->percent_cases)
-    return false;
-
-  dfm_push (reader);
   if (parser->type == DP_DELIMITED)
     {
       if (parser->span)
-        retval = parse_delimited_span (parser, reader, c);
+        retval = parse_delimited_span (parser, reader, dict, c);
       else
-        retval = parse_delimited_no_span (parser, reader, c);
+        retval = parse_delimited_no_span (parser, reader, dict, c);
     }
   else
-    retval = parse_fixed (parser, reader, c);
-  dfm_pop (reader);
+    retval = parse_fixed (parser, reader, dict, c);
 
   return retval;
 }
@@ -414,16 +405,19 @@ data_parser_parse (struct data_parser *parser, struct dfm_reader *reader,
    *FIELD is set to the field content.  The caller must not or
    destroy this constant string.
 
-   After parsing the field, sets the current position in the
-   record to just past the field and any trailing delimiter.
-   Returns 0 on failure or a 1-based column number indicating the
-   beginning of the field on success. */
+   Sets *FIRST_COLUMN to the 1-based column number of the start of
+   the extracted field, and *LAST_COLUMN to the end of the extracted
+   field.
+
+   Returns true on success, false on failure. */
 static bool
 cut_field (const struct data_parser *parser, struct dfm_reader *reader,
            int *first_column, int *last_column, struct string *tmp,
            struct substring *field)
 {
+  size_t length_before_separators;
   struct substring line, p;
+  bool quoted;
 
   if (dfm_eof (reader))
     return false;
@@ -450,62 +444,86 @@ cut_field (const struct data_parser *parser, struct dfm_reader *reader,
     }
 
   *first_column = dfm_column_start (reader);
-  if (ss_find_char (parser->quotes, ss_first (p)) != SIZE_MAX)
+  quoted = ss_find_byte (parser->quotes, ss_first (p)) != SIZE_MAX;
+  if (quoted)
     {
       /* Quoted field. */
-      int quote = ss_get_char (&p);
+      int quote = ss_get_byte (&p);
       if (!ss_get_until (&p, quote, field))
-        msg (SW, _("Quoted string extends beyond end of line."));
+        msg (DW, _("Quoted string extends beyond end of line."));
       if (parser->quote_escape && ss_first (p) == quote)
         {
           ds_assign_substring (tmp, *field);
-          while (ss_match_char (&p, quote))
+          while (ss_match_byte (&p, quote))
             {
               struct substring ss;
-              ds_put_char (tmp, quote);
+              ds_put_byte (tmp, quote);
               if (!ss_get_until (&p, quote, &ss))
-                msg (SW, _("Quoted string extends beyond end of line."));
+                msg (DW, _("Quoted string extends beyond end of line."));
               ds_put_substring (tmp, ss);
             }
           *field = ds_ss (tmp);
         }
-      *last_column = dfm_column_start (reader);
-
-      /* Skip trailing soft separator and a single hard separator
-         if present. */
-      ss_ltrim (&p, parser->soft_seps);
-      if (!ss_is_empty (p)
-          && ss_find_char (parser->hard_seps, ss_first (p)) != SIZE_MAX)
-        ss_advance (&p, 1);
+      *last_column = *first_column + (ss_length (line) - ss_length (p));
     }
   else
     {
       /* Regular field. */
-      ss_get_chars (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
-      *last_column = dfm_column_start (reader);
-      if (!ss_ltrim (&p, parser->soft_seps) || ss_is_empty (p))
-        {
-          /* Advance past a trailing hard separator,
-             regardless of whether one actually existed.  If
-             we "skip" a delimiter that was not actually
-             there, then we will return end-of-line on our
-             next call, which is what we want. */
-          dfm_forward_columns (reader, 1);
-        }
+      ss_get_bytes (&p, ss_cspan (p, ds_ss (&parser->any_sep)), field);
+      *last_column = *first_column + ss_length (*field);
+    }
+
+  /* Skip trailing soft separator and a single hard separator if present. */
+  length_before_separators = ss_length (p);
+  ss_ltrim (&p, parser->soft_seps);
+  if (!ss_is_empty (p)
+      && ss_find_byte (parser->hard_seps, ss_first (p)) != SIZE_MAX)
+    {
+      ss_advance (&p, 1);
+      ss_ltrim (&p, parser->soft_seps);
     }
+  if (ss_is_empty (p))
+    dfm_forward_columns (reader, 1);
+  else if (quoted && length_before_separators == ss_length (p))
+    msg (DW, _("Missing delimiter following quoted string."));
   dfm_forward_columns (reader, ss_length (line) - ss_length (p));
 
   return true;
 }
 
-/* Reads a case from READER into C, parsing it according to
-   fixed-format syntax rules in PARSER.
-   Returns true if successful, false at end of file or on I/O error. */
+static void
+parse_error (const struct dfm_reader *reader, const struct field *field,
+             int first_column, int last_column, char *error)
+{
+  int line_number = dfm_get_line_number (reader);
+  struct msg_location *location = xmalloc (sizeof *location);
+  *location = (struct msg_location) {
+    .file_name = intern_new (dfm_get_file_name (reader)),
+    .start = { .line = line_number, .column = first_column },
+    .end = { .line = line_number, .column = last_column - 1 },
+  };
+  struct msg *m = xmalloc (sizeof *m);
+  *m = (struct msg) {
+    .category = MSG_C_DATA,
+    .severity = MSG_S_WARNING,
+    .location = location,
+    .text = xasprintf (_("Data for variable %s is not valid as format %s: %s"),
+                       field->name, fmt_name (field->format.type), error),
+  };
+  msg_emit (m);
+
+  free (error);
+}
+
+/* Reads a case from READER into C, which matches DICT, parsing it according to
+   fixed-format syntax rules in PARSER.  Returns true if successful, false at
+   end of file or on I/O error. */
 static bool
 parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
-             struct ccase *c)
+             struct dictionary *dict, struct ccase *c)
 {
-  enum legacy_encoding encoding = dfm_reader_get_legacy_encoding (reader);
+  const char *input_encoding = dfm_reader_get_encoding (reader);
+  const char *output_encoding = dict_get_encoding (dict);
   struct field *f;
   int row;
 
@@ -519,20 +537,31 @@ parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
 
       if (dfm_eof (reader))
         {
-          msg (SW, _("Partial case of %d of %d records discarded."),
+          msg (DW, _("Partial case of %d of %d records discarded."),
                row - 1, parser->records_per_case);
           return false;
         }
       dfm_expand_tabs (reader);
       line = dfm_get_record (reader);
 
-      for (; f < &parser->fields[parser->field_cnt] && f->record == row; f++)
-        data_in (ss_substr (line, f->first_column - 1,
-                            f->format.w),
-                 encoding, f->format.type, f->format.d,
-                 f->first_column, f->first_column + f->format.w,
-                 case_data_rw_idx (c, f->case_idx),
-                 fmt_var_width (&f->format));
+      for (; f < &parser->fields[parser->n_fields] && f->record == row; f++)
+        {
+          struct substring s = ss_substr (line, f->first_column - 1,
+                                          f->format.w);
+          union value *value = case_data_rw_idx (c, f->case_idx);
+          char *error = data_in (s, input_encoding, f->format.type,
+                                 settings_get_fmt_settings (),
+                                 value, fmt_var_width (&f->format),
+                                 output_encoding);
+
+          if (error == NULL)
+            data_in_imply_decimals (s, input_encoding, f->format.type,
+                                    f->format.d, settings_get_fmt_settings (),
+                                    value);
+          else
+            parse_error (reader, f, f->first_column,
+                         f->first_column + f->format.w, error);
+        }
 
       dfm_forward_record (reader);
     }
@@ -540,21 +569,23 @@ parse_fixed (const struct data_parser *parser, struct dfm_reader *reader,
   return true;
 }
 
-/* Reads a case from READER into C, parsing it according to
-   free-format syntax rules in PARSER.
-   Returns true if successful, false at end of file or on I/O error. */
+/* Reads a case from READER into C, which matches dictionary DICT, parsing it
+   according to free-format syntax rules in PARSER.  Returns true if
+   successful, false at end of file or on I/O error. */
 static bool
 parse_delimited_span (const struct data_parser *parser,
-                      struct dfm_reader *reader, struct ccase *c)
+                      struct dfm_reader *reader,
+                      struct dictionary *dict, struct ccase *c)
 {
-  enum legacy_encoding encoding = dfm_reader_get_legacy_encoding (reader);
+  const char *output_encoding = dict_get_encoding (dict);
   struct string tmp = DS_EMPTY_INITIALIZER;
   struct field *f;
 
-  for (f = parser->fields; f < &parser->fields[parser->field_cnt]; f++)
+  for (f = parser->fields; f < &parser->fields[parser->n_fields]; f++)
     {
       struct substring s;
       int first_column, last_column;
+      char *error;
 
       /* Cut out a field and read in a new record if necessary. */
       while (!cut_field (parser, reader,
@@ -565,63 +596,73 @@ parse_delimited_span (const struct data_parser *parser,
          if (dfm_eof (reader))
            {
              if (f > parser->fields)
-               msg (SW, _("Partial case discarded.  The first variable "
+               msg (DW, _("Partial case discarded.  The first variable "
                            "missing was %s."), f->name);
               ds_destroy (&tmp);
              return false;
            }
        }
 
-      data_in (s, encoding, f->format.type, 0,
-               first_column, last_column,
-               case_data_rw_idx (c, f->case_idx),
-               fmt_var_width (&f->format));
+      const char *input_encoding = dfm_reader_get_encoding (reader);
+      error = data_in (s, input_encoding, f->format.type,
+                       settings_get_fmt_settings (),
+                       case_data_rw_idx (c, f->case_idx),
+                       fmt_var_width (&f->format), output_encoding);
+      if (error != NULL)
+        parse_error (reader, f, first_column, last_column, error);
     }
   ds_destroy (&tmp);
   return true;
 }
 
-/* Reads a case from READER into C, parsing it according to
-   delimited syntax rules with one case per record in PARSER.
+/* Reads a case from READER into C, which matches dictionary DICT, parsing it
+   according to delimited syntax rules with one case per record in PARSER.
    Returns true if successful, false at end of file or on I/O error. */
 static bool
 parse_delimited_no_span (const struct data_parser *parser,
-                         struct dfm_reader *reader, struct ccase *c)
+                         struct dfm_reader *reader,
+                         struct dictionary *dict, struct ccase *c)
 {
-  enum legacy_encoding encoding = dfm_reader_get_legacy_encoding (reader);
+  const char *output_encoding = dict_get_encoding (dict);
   struct string tmp = DS_EMPTY_INITIALIZER;
   struct substring s;
-  struct field *f;
+  struct field *f, *end;
 
   if (dfm_eof (reader))
     return false;
 
-  for (f = parser->fields; f < &parser->fields[parser->field_cnt]; f++)
+  end = &parser->fields[parser->n_fields];
+  for (f = parser->fields; f < end; f++)
     {
       int first_column, last_column;
+      char *error;
+
       if (!cut_field (parser, reader, &first_column, &last_column, &tmp, &s))
        {
-         if (settings_get_undefined ())
-           msg (SW, _("Missing value(s) for all variables from %s onward.  "
+         if (f < end - 1 && settings_get_undefined () && parser->warn_missing_fields)
+           msg (DW, _("Missing value(s) for all variables from %s onward.  "
                        "These will be filled with the system-missing value "
                        "or blanks, as appropriate."),
                 f->name);
-          for (; f < &parser->fields[parser->field_cnt]; f++)
+          for (; f < end; f++)
             value_set_missing (case_data_rw_idx (c, f->case_idx),
                                fmt_var_width (&f->format));
           goto exit;
        }
 
-      data_in (s, encoding, f->format.type, 0,
-               first_column, last_column,
-               case_data_rw_idx (c, f->case_idx),
-               fmt_var_width (&f->format));
+      const char *input_encoding = dfm_reader_get_encoding (reader);
+      error = data_in (s, input_encoding, f->format.type,
+                       settings_get_fmt_settings (),
+                       case_data_rw_idx (c, f->case_idx),
+                       fmt_var_width (&f->format), output_encoding);
+      if (error != NULL)
+        parse_error (reader, f, first_column, last_column, error);
     }
 
   s = dfm_get_record (reader);
   ss_ltrim (&s, parser->soft_seps);
   if (!ss_is_empty (s))
-    msg (SW, _("Record ends in data not part of any field."));
+    msg (DW, _("Record ends in data not part of any field."));
 
 exit:
   dfm_forward_record (reader);
@@ -635,39 +676,48 @@ static void
 dump_fixed_table (const struct data_parser *parser,
                   const struct file_handle *fh)
 {
-  struct tab_table *t;
-  size_t i;
-
-  t = tab_create (4, parser->field_cnt + 1, 0);
-  tab_columns (t, TAB_COL_DOWN, 1);
-  tab_headers (t, 0, 0, 1, 0);
-  tab_text (t, 0, 0, TAB_CENTER | TAT_TITLE, _("Variable"));
-  tab_text (t, 1, 0, TAB_CENTER | TAT_TITLE, _("Record"));
-  tab_text (t, 2, 0, TAB_CENTER | TAT_TITLE, _("Columns"));
-  tab_text (t, 3, 0, TAB_CENTER | TAT_TITLE, _("Format"));
-  tab_box (t, TAL_1, TAL_1, TAL_0, TAL_1, 0, 0, 3, parser->field_cnt);
-  tab_hline (t, TAL_2, 0, 3, 1);
-  tab_dim (t, tab_natural_dimensions, NULL, NULL);
-
-  for (i = 0; i < parser->field_cnt; i++)
+  /* XXX This should not be preformatted. */
+  char *title = xasprintf (ngettext ("Reading %d record from %s.",
+                                     "Reading %d records from %s.",
+                                     parser->records_per_case),
+                           parser->records_per_case, fh_get_name (fh));
+  struct pivot_table *table = pivot_table_create__ (
+    pivot_value_new_user_text (title, -1), "Fixed Data Records");
+  free (title);
+
+  pivot_dimension_create (
+    table, PIVOT_AXIS_COLUMN, N_("Attributes"),
+    N_("Record"), N_("Columns"), N_("Format"));
+
+  struct pivot_dimension *variables = pivot_dimension_create (
+    table, PIVOT_AXIS_ROW, N_("Variable"));
+  variables->root->show_label = true;
+  for (size_t i = 0; i < parser->n_fields; i++)
     {
       struct field *f = &parser->fields[i];
-      char fmt_string[FMT_STRING_LEN_MAX + 1];
-      int row = i + 1;
-
-      tab_text (t, 0, row, TAB_LEFT, f->name);
-      tab_text (t, 1, row, TAT_PRINTF, "%d", f->record);
-      tab_text (t, 2, row, TAT_PRINTF, "%3d-%3d",
-                f->first_column, f->first_column + f->format.w - 1);
-      tab_text (t, 3, row, TAB_LEFT | TAB_FIX,
-                fmt_to_string (&f->format, fmt_string));
+
+      /* XXX It would be better to have the actual variable here. */
+      int variable_idx = pivot_category_create_leaf (
+        variables->root, pivot_value_new_user_text (f->name, -1));
+
+      pivot_table_put2 (table, 0, variable_idx,
+                        pivot_value_new_integer (f->record));
+
+      int first_column = f->first_column;
+      int last_column = f->first_column + f->format.w - 1;
+      char *columns = xasprintf ("%d-%d", first_column, last_column);
+      pivot_table_put2 (table, 1, variable_idx,
+                        pivot_value_new_user_text (columns, -1));
+      free (columns);
+
+      char str[FMT_STRING_LEN_MAX + 1];
+      pivot_table_put2 (table, 2, variable_idx,
+                        pivot_value_new_user_text (
+                          fmt_to_string (&f->format, str), -1));
+
     }
 
-  tab_title (t, ngettext ("Reading %d record from %s.",
-                          "Reading %d records from %s.",
-                          parser->records_per_case),
-             parser->records_per_case, fh_get_name (fh));
-  tab_submit (t);
+  pivot_table_submit (table);
 }
 
 /* Displays a table giving information on free-format variable parsing
@@ -676,32 +726,32 @@ static void
 dump_delimited_table (const struct data_parser *parser,
                       const struct file_handle *fh)
 {
-  struct tab_table *t;
-  size_t i;
+  struct pivot_table *table = pivot_table_create__ (
+    pivot_value_new_text_format (N_("Reading free-form data from %s."),
+                                 fh_get_name (fh)),
+    "Free-Form Data Records");
 
-  t = tab_create (2, parser->field_cnt + 1, 0);
-  tab_columns (t, TAB_COL_DOWN, 1);
-  tab_headers (t, 0, 0, 1, 0);
-  tab_text (t, 0, 0, TAB_CENTER | TAT_TITLE, _("Variable"));
-  tab_text (t, 1, 0, TAB_CENTER | TAT_TITLE, _("Format"));
-  tab_box (t, TAL_1, TAL_1, TAL_0, TAL_1, 0, 0, 1, parser->field_cnt);
-  tab_hline (t, TAL_2, 0, 1, 1);
-  tab_dim (t, tab_natural_dimensions, NULL, NULL);
+  pivot_dimension_create (
+    table, PIVOT_AXIS_COLUMN, N_("Attributes"), N_("Format"));
 
-  for (i = 0; i < parser->field_cnt; i++)
+  struct pivot_dimension *variables = pivot_dimension_create (
+    table, PIVOT_AXIS_ROW, N_("Variable"));
+  variables->root->show_label = true;
+  for (size_t i = 0; i < parser->n_fields; i++)
     {
       struct field *f = &parser->fields[i];
-      char str[FMT_STRING_LEN_MAX + 1];
-      int row = i + 1;
 
-      tab_text (t, 0, row, TAB_LEFT, f->name);
-      tab_text (t, 1, row, TAB_LEFT | TAB_FIX,
-                fmt_to_string (&f->format, str));
-    }
+      /* XXX It would be better to have the actual variable here. */
+      int variable_idx = pivot_category_create_leaf (
+        variables->root, pivot_value_new_user_text (f->name, -1));
 
-  tab_title (t, _("Reading free-form data from %s."), fh_get_name (fh));
+      char str[FMT_STRING_LEN_MAX + 1];
+      pivot_table_put2 (table, 0, variable_idx,
+                        pivot_value_new_user_text (
+                          fmt_to_string (&f->format, str), -1));
+    }
 
-  tab_submit (t);
+  pivot_table_submit (table);
 }
 
 /* Displays a table giving information on how PARSER will read
@@ -720,41 +770,56 @@ data_parser_output_description (struct data_parser *parser,
 struct data_parser_casereader
   {
     struct data_parser *parser; /* Parser. */
+    struct dictionary *dict;    /* Dictionary. */
     struct dfm_reader *reader;  /* Data file reader. */
     struct caseproto *proto;    /* Format of cases. */
   };
 
 static const struct casereader_class data_parser_casereader_class;
 
-/* Replaces DS's active file by an input program that reads data
+/* Replaces DS's active dataset by an input program that reads data
    from READER according to the rules in PARSER, using DICT as
    the underlying dictionary.  Ownership of PARSER and READER is
    transferred to the input program, and ownership of DICT is
    transferred to the dataset. */
 void
 data_parser_make_active_file (struct data_parser *parser, struct dataset *ds,
-                              struct dfm_reader *reader,
-                              struct dictionary *dict)
+                              struct dfm_reader *reader,
+                              struct dictionary *dict,
+                              struct casereader* (*func)(struct casereader *,
+                                                         const struct dictionary *,
+                                                         void *),
+                              void *ud)
 {
   struct data_parser_casereader *r;
-  struct casereader *casereader;
+  struct casereader *casereader0;
+  struct casereader *casereader1;
 
   r = xmalloc (sizeof *r);
   r->parser = parser;
+  r->dict = dict_ref (dict);
   r->reader = reader;
   r->proto = caseproto_ref (dict_get_proto (dict));
-  casereader = casereader_create_sequential (NULL, r->proto,
+  casereader0 = casereader_create_sequential (NULL, r->proto,
                                              CASENUMBER_MAX,
                                              &data_parser_casereader_class, r);
-  proc_set_active_file (ds, casereader, dict);
+
+  if (func)
+    casereader1 = func (casereader0, dict, ud);
+  else
+    casereader1 = casereader0;
+
+  dataset_set_dict (ds, dict);
+  dataset_set_source (ds, casereader1);
 }
 
+
 static struct ccase *
 data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
 {
   struct data_parser_casereader *r = r_;
   struct ccase *c = case_create (r->proto);
-  if (data_parser_parse (r->parser, r->reader, c))
+  if (data_parser_parse (r->parser, r->reader, r->dict, c))
     return c;
   else
     {
@@ -764,14 +829,15 @@ data_parser_casereader_read (struct casereader *reader UNUSED, void *r_)
 }
 
 static void
-data_parser_casereader_destroy (struct casereader *reader UNUSED, void *r_)
+data_parser_casereader_destroy (struct casereader *reader, void *r_)
 {
   struct data_parser_casereader *r = r_;
   if (dfm_reader_error (r->reader))
     casereader_force_error (reader);
-  data_parser_destroy (r->parser);
   dfm_close_reader (r->reader);
   caseproto_unref (r->proto);
+  dict_unref (r->dict);
+  data_parser_destroy (r->parser);
   free (r);
 }