Implement SKIP keyword on DATA LIST. Fixes bug #17099.
[pspp-builds.git] / src / language / data-io / data-list.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <ctype.h>
23 #include <float.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26
27 #include <data/case-source.h>
28 #include <data/case.h>
29 #include <data/case-source.h>
30 #include <data/data-in.h>
31 #include <data/dictionary.h>
32 #include <data/format.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/transformations.h>
36 #include <data/variable.h>
37 #include <language/command.h>
38 #include <language/data-io/data-reader.h>
39 #include <language/data-io/file-handle.h>
40 #include <language/data-io/inpt-pgm.h>
41 #include <language/data-io/placement-parser.h>
42 #include <language/lexer/format-parser.h>
43 #include <language/lexer/lexer.h>
44 #include <language/lexer/variable-parser.h>
45 #include <libpspp/alloc.h>
46 #include <libpspp/assertion.h>
47 #include <libpspp/compiler.h>
48 #include <libpspp/ll.h>
49 #include <libpspp/message.h>
50 #include <libpspp/misc.h>
51 #include <libpspp/pool.h>
52 #include <libpspp/str.h>
53 #include <output/table.h>
54
55 #include "size_max.h"
56 #include "xsize.h"
57
58 #include "gettext.h"
59 #define _(msgid) gettext (msgid)
60 \f
61 /* Utility function. */
62
63 /* Describes how to parse one variable. */
64 struct dls_var_spec
65   {
66     struct ll ll;               /* List element. */
67
68     /* All parsers. */
69     struct fmt_spec input;      /* Input format of this field. */
70     int fv;                     /* First value in case. */
71     char name[LONG_NAME_LEN + 1]; /* Var name for error messages and tables. */
72
73     /* Fixed format only. */
74     int record;                 /* Record number (1-based). */
75     int first_column;           /* Column numbers in record. */
76   };
77
78 static struct dls_var_spec *
79 ll_to_dls_var_spec (struct ll *ll) 
80 {
81   return ll_data (ll, struct dls_var_spec, ll);
82 }
83
84 /* Constants for DATA LIST type. */
85 enum dls_type
86   {
87     DLS_FIXED,
88     DLS_FREE,
89     DLS_LIST
90   };
91
92 /* DATA LIST private data structure. */
93 struct data_list_pgm
94   {
95     struct pool *pool;          /* Used for all DATA LIST storage. */
96     struct ll_list specs;       /* List of dls_var_specs. */
97     struct dfm_reader *reader;  /* Data file reader. */
98     enum dls_type type;         /* Type of DATA LIST construct. */
99     struct variable *end;       /* Variable specified on END subcommand. */
100     int record_cnt;             /* Number of records. */
101     struct string delims;       /* Field delimiters. */
102     int skip_records;           /* Records to skip before first case. */
103   };
104
105 static const struct case_source_class data_list_source_class;
106
107 static bool parse_fixed (struct dictionary *dict, 
108                          struct pool *tmp_pool, struct data_list_pgm *);
109 static bool parse_free (struct dictionary *dict, 
110                         struct pool *tmp_pool, struct data_list_pgm *);
111 static void dump_fixed_table (const struct ll_list *,
112                               const struct file_handle *, int record_cnt);
113 static void dump_free_table (const struct data_list_pgm *,
114                              const struct file_handle *);
115
116 static trns_free_func data_list_trns_free;
117 static trns_proc_func data_list_trns_proc;
118
119 int
120 cmd_data_list (struct dataset *ds)
121 {
122   struct dictionary *dict = dataset_dict (ds);
123   struct data_list_pgm *dls;
124   int table = -1;                /* Print table if nonzero, -1=undecided. */
125   struct file_handle *fh = fh_inline_file ();
126   struct pool *tmp_pool;
127   bool ok;
128
129   if (!in_input_program ())
130     discard_variables (ds);
131
132   dls = pool_create_container (struct data_list_pgm, pool);
133   ll_init (&dls->specs);
134   dls->reader = NULL;
135   dls->type = -1;
136   dls->end = NULL;
137   dls->record_cnt = 0;
138   dls->skip_records = 0;
139   ds_init_empty (&dls->delims);
140   ds_register_pool (&dls->delims, dls->pool);
141
142   tmp_pool = pool_create_subpool (dls->pool);
143
144   while (token != '/')
145     {
146       if (lex_match_id ("FILE"))
147         {
148           lex_match ('=');
149           fh = fh_parse (FH_REF_FILE | FH_REF_INLINE);
150           if (fh == NULL)
151             goto error;
152         }
153       else if (lex_match_id ("RECORDS"))
154         {
155           lex_match ('=');
156           lex_match ('(');
157           if (!lex_force_int ())
158             goto error;
159           dls->record_cnt = lex_integer ();
160           lex_get ();
161           lex_match (')');
162         }
163       else if (lex_match_id ("SKIP"))
164         {
165           lex_match ('=');
166           if (!lex_force_int ())
167             goto error;
168           dls->skip_records = lex_integer ();
169           lex_get ();
170         }
171       else if (lex_match_id ("END"))
172         {
173           if (dls->end)
174             {
175               msg (SE, _("The END subcommand may only be specified once."));
176               goto error;
177             }
178           
179           lex_match ('=');
180           if (!lex_force_id ())
181             goto error;
182           dls->end = dict_lookup_var (dataset_dict (ds), tokid);
183           if (!dls->end) 
184             dls->end = dict_create_var_assert (dataset_dict (ds), tokid, 0);
185           lex_get ();
186         }
187       else if (token == T_ID)
188         {
189           if (lex_match_id ("NOTABLE"))
190             table = 0;
191           else if (lex_match_id ("TABLE"))
192             table = 1;
193           else 
194             {
195               int type;
196               if (lex_match_id ("FIXED"))
197                 type = DLS_FIXED;
198               else if (lex_match_id ("FREE"))
199                 type = DLS_FREE;
200               else if (lex_match_id ("LIST"))
201                 type = DLS_LIST;
202               else 
203                 {
204                   lex_error (NULL);
205                   goto error;
206                 }
207
208               if (dls->type != -1)
209                 {
210                   msg (SE, _("Only one of FIXED, FREE, or LIST may "
211                              "be specified."));
212                   goto error;
213                 }
214               dls->type = type;
215
216               if ((dls->type == DLS_FREE || dls->type == DLS_LIST)
217                   && lex_match ('(')) 
218                 {
219                   while (!lex_match (')'))
220                     {
221                       int delim;
222
223                       if (lex_match_id ("TAB"))
224                         delim = '\t';
225                       else if (token == T_STRING && ds_length (&tokstr) == 1)
226                         {
227                           delim = ds_first (&tokstr);
228                           lex_get ();
229                         }
230                       else 
231                         {
232                           lex_error (NULL);
233                           goto error;
234                         }
235
236                       ds_put_char (&dls->delims, delim);
237
238                       lex_match (',');
239                     }
240                 }
241             }
242         }
243       else
244         {
245           lex_error (NULL);
246           goto error;
247         }
248     }
249
250   fh_set_default_handle (fh);
251
252   if (dls->type == -1)
253     dls->type = DLS_FIXED;
254
255   if (table == -1)
256     table = dls->type != DLS_FREE;
257
258   ok = (dls->type == DLS_FIXED ? parse_fixed : parse_free) (dict, tmp_pool, dls);
259   if (!ok)
260     goto error;
261
262   if (lex_end_of_command () != CMD_SUCCESS)
263     goto error;
264
265   if (table)
266     {
267       if (dls->type == DLS_FIXED)
268         dump_fixed_table (&dls->specs, fh, dls->record_cnt);
269       else
270         dump_free_table (dls, fh);
271     }
272
273   dls->reader = dfm_open_reader (fh);
274   if (dls->reader == NULL)
275     goto error;
276
277   if (in_input_program ())
278     add_transformation (ds, data_list_trns_proc, data_list_trns_free, dls);
279   else 
280     proc_set_source (ds, create_case_source (&data_list_source_class, dls));
281
282   pool_destroy (tmp_pool);
283
284   return CMD_SUCCESS;
285
286  error:
287   data_list_trns_free (dls);
288   return CMD_CASCADING_FAILURE;
289 }
290 \f
291 /* Fixed-format parsing. */
292
293 /* Parses all the variable specifications for DATA LIST FIXED,
294    storing them into DLS.  Uses TMP_POOL for data that is not
295    needed once parsing is complete.  Returns true only if
296    successful. */
297 static bool
298 parse_fixed (struct dictionary *dict, 
299              struct pool *tmp_pool, struct data_list_pgm *dls)
300 {
301   int last_nonempty_record;
302   int record = 0;
303   int column = 1;
304
305   while (token != '.')
306     {
307       char **names;
308       size_t name_cnt, name_idx;
309       struct fmt_spec *formats, *f;
310       size_t format_cnt;
311
312       /* Parse everything. */
313       if (!parse_record_placement (&record, &column)
314           || !parse_DATA_LIST_vars_pool (tmp_pool, &names, &name_cnt, PV_NONE)
315           || !parse_var_placements (tmp_pool, name_cnt, true,
316                                     &formats, &format_cnt))
317         return false;
318
319       /* Create variables and var specs. */
320       name_idx = 0;
321       for (f = formats; f < &formats[format_cnt]; f++)
322         if (!execute_placement_format (f, &record, &column))
323           {
324             char *name;
325             int width;
326             struct variable *v;
327             struct dls_var_spec *spec;
328               
329             name = names[name_idx++];
330
331             /* Create variable. */
332             width = fmt_var_width (f);
333             v = dict_create_var (dict, name, width);
334             if (v != NULL)
335               {
336                 /* Success. */
337                 struct fmt_spec output = fmt_for_output_from_input (f);
338                 v->print = output;
339                 v->write = output;
340               }
341             else
342               {
343                 /* Failure.
344                    This can be acceptable if we're in INPUT
345                    PROGRAM, but only if the existing variable has
346                    the same width as the one we would have
347                    created. */ 
348                 if (!in_input_program ())
349                   {
350                     msg (SE, _("%s is a duplicate variable name."), name);
351                     return false;
352                   }
353
354                 v = dict_lookup_var_assert (dict, name);
355                 if ((width != 0) != (v->width != 0))
356                   {
357                     msg (SE, _("There is already a variable %s of a "
358                                "different type."),
359                          name);
360                     return false;
361                   }
362                 if (width != 0 && width != v->width)
363                   {
364                     msg (SE, _("There is already a string variable %s of a "
365                                "different width."), name);
366                     return false;
367                   }
368               }
369
370             /* Create specifier for parsing the variable. */
371             spec = pool_alloc (dls->pool, sizeof *spec);
372             spec->input = *f;
373             spec->fv = v->fv;
374             spec->record = record;
375             spec->first_column = column;
376             strcpy (spec->name, v->name);
377             ll_push_tail (&dls->specs, &spec->ll);
378
379             column += f->w;
380           }
381       assert (name_idx == name_cnt);
382     }
383   if (ll_is_empty (&dls->specs)) 
384     {
385       msg (SE, _("At least one variable must be specified."));
386       return false;
387     }
388
389   last_nonempty_record = ll_to_dls_var_spec (ll_tail (&dls->specs))->record;
390   if (dls->record_cnt && last_nonempty_record > dls->record_cnt)
391     {
392       msg (SE, _("Variables are specified on records that "
393                  "should not exist according to RECORDS subcommand."));
394       return false;
395     }
396   else if (!dls->record_cnt) 
397     dls->record_cnt = last_nonempty_record;
398
399   return true;
400 }
401
402 /* Displays a table giving information on fixed-format variable
403    parsing on DATA LIST. */
404 static void
405 dump_fixed_table (const struct ll_list *specs,
406                   const struct file_handle *fh, int record_cnt)
407 {
408   size_t spec_cnt;
409   struct tab_table *t;
410   struct dls_var_spec *spec;
411   int row;
412
413   spec_cnt = ll_count (specs);
414   t = tab_create (4, spec_cnt + 1, 0);
415   tab_columns (t, TAB_COL_DOWN, 1);
416   tab_headers (t, 0, 0, 1, 0);
417   tab_text (t, 0, 0, TAB_CENTER | TAT_TITLE, _("Variable"));
418   tab_text (t, 1, 0, TAB_CENTER | TAT_TITLE, _("Record"));
419   tab_text (t, 2, 0, TAB_CENTER | TAT_TITLE, _("Columns"));
420   tab_text (t, 3, 0, TAB_CENTER | TAT_TITLE, _("Format"));
421   tab_box (t, TAL_1, TAL_1, TAL_0, TAL_1, 0, 0, 3, spec_cnt);
422   tab_hline (t, TAL_2, 0, 3, 1);
423   tab_dim (t, tab_natural_dimensions);
424
425   row = 1;
426   ll_for_each (spec, struct dls_var_spec, ll, specs)
427     {
428       char fmt_string[FMT_STRING_LEN_MAX + 1];
429       tab_text (t, 0, row, TAB_LEFT, spec->name);
430       tab_text (t, 1, row, TAT_PRINTF, "%d", spec->record);
431       tab_text (t, 2, row, TAT_PRINTF, "%3d-%3d",
432                 spec->first_column, spec->first_column + spec->input.w - 1);
433       tab_text (t, 3, row, TAB_LEFT | TAB_FIX,
434                 fmt_to_string (&spec->input, fmt_string));
435       row++;
436     }
437
438   tab_title (t, ngettext ("Reading %d record from %s.",
439                           "Reading %d records from %s.", record_cnt),
440              record_cnt, fh_get_name (fh));
441   tab_submit (t);
442 }
443 \f
444 /* Free-format parsing. */
445
446 /* Parses variable specifications for DATA LIST FREE and adds
447    them to DLS.  Uses TMP_POOL for data that is not needed once
448    parsing is complete.  Returns true only if successful. */
449 static bool
450 parse_free (struct dictionary *dict, struct pool *tmp_pool, struct data_list_pgm *dls)
451 {
452   lex_get ();
453   while (token != '.')
454     {
455       struct fmt_spec input, output;
456       char **name;
457       size_t name_cnt;
458       size_t i;
459
460       if (!parse_DATA_LIST_vars_pool (tmp_pool, &name, &name_cnt, PV_NONE))
461         return 0;
462
463       if (lex_match ('('))
464         {
465           if (!parse_format_specifier (&input)
466               || !fmt_check_input (&input)
467               || !lex_force_match (')')) 
468             return NULL;
469           output = fmt_for_output_from_input (&input);
470         }
471       else
472         {
473           lex_match ('*');
474           input = fmt_for_input (FMT_F, 8, 0);
475           output = *get_format ();
476         }
477
478       for (i = 0; i < name_cnt; i++)
479         {
480           struct dls_var_spec *spec;
481           struct variable *v;
482
483           v = dict_create_var (dict, name[i], fmt_var_width (&input));
484           if (v == NULL)
485             {
486               msg (SE, _("%s is a duplicate variable name."), name[i]);
487               return 0;
488             }
489           v->print = v->write = output;
490
491           spec = pool_alloc (dls->pool, sizeof *spec);
492           spec->input = input;
493           spec->fv = v->fv;
494           strcpy (spec->name, v->name);
495           ll_push_tail (&dls->specs, &spec->ll);
496         }
497     }
498
499   return true;
500 }
501
502 /* Displays a table giving information on free-format variable parsing
503    on DATA LIST. */
504 static void
505 dump_free_table (const struct data_list_pgm *dls,
506                  const struct file_handle *fh)
507 {
508   struct tab_table *t;
509   struct dls_var_spec *spec;
510   size_t spec_cnt;
511   int row;
512
513   spec_cnt = ll_count (&dls->specs);
514   
515   t = tab_create (2, spec_cnt + 1, 0);
516   tab_columns (t, TAB_COL_DOWN, 1);
517   tab_headers (t, 0, 0, 1, 0);
518   tab_text (t, 0, 0, TAB_CENTER | TAT_TITLE, _("Variable"));
519   tab_text (t, 1, 0, TAB_CENTER | TAT_TITLE, _("Format"));
520   tab_box (t, TAL_1, TAL_1, TAL_0, TAL_1, 0, 0, 1, spec_cnt);
521   tab_hline (t, TAL_2, 0, 1, 1);
522   tab_dim (t, tab_natural_dimensions);
523   row = 1;
524   ll_for_each (spec, struct dls_var_spec, ll, &dls->specs)
525     {
526       char str[FMT_STRING_LEN_MAX + 1];
527       tab_text (t, 0, row, TAB_LEFT, spec->name);
528       tab_text (t, 1, row, TAB_LEFT | TAB_FIX,
529                 fmt_to_string (&spec->input, str));
530       row++;
531     }
532
533   tab_title (t, _("Reading free-form data from %s."), fh_get_name (fh));
534   
535   tab_submit (t);
536 }
537 \f
538 /* Input procedure. */ 
539
540 /* Extracts a field from the current position in the current
541    record.  Fields can be unquoted or quoted with single- or
542    double-quote characters.
543
544    *FIELD is set to the field content.  The caller must not
545    or destroy this constant string.
546    
547    After parsing the field, sets the current position in the
548    record to just past the field and any trailing delimiter.
549    Returns 0 on failure or a 1-based column number indicating the
550    beginning of the field on success. */
551 static bool
552 cut_field (const struct data_list_pgm *dls, struct substring *field)
553 {
554   struct substring line, p;
555
556   if (dfm_eof (dls->reader))
557     return false;
558   if (ds_is_empty (&dls->delims))
559     dfm_expand_tabs (dls->reader);
560   line = p = dfm_get_record (dls->reader);
561
562   if (ds_is_empty (&dls->delims)) 
563     {
564       bool missing_quote = false;
565       
566       /* Skip leading whitespace. */
567       ss_ltrim (&p, ss_cstr (CC_SPACES));
568       if (ss_is_empty (p))
569         return false;
570       
571       /* Handle actual data, whether quoted or unquoted. */
572       if (ss_match_char (&p, '\''))
573         missing_quote = !ss_get_until (&p, '\'', field);
574       else if (ss_match_char (&p, '"'))
575         missing_quote = !ss_get_until (&p, '"', field);
576       else
577         ss_get_chars (&p, ss_cspan (p, ss_cstr ("," CC_SPACES)), field);
578       if (missing_quote)
579         msg (SW, _("Quoted string extends beyond end of line."));
580
581       /* Skip trailing whitespace and a single comma if present. */
582       ss_ltrim (&p, ss_cstr (CC_SPACES));
583       ss_match_char (&p, ',');
584
585       dfm_forward_columns (dls->reader, ss_length (line) - ss_length (p));
586     }
587   else 
588     {
589       if (!ss_is_empty (p))
590         ss_get_chars (&p, ss_cspan (p, ds_ss (&dls->delims)), field);
591       else if (dfm_columns_past_end (dls->reader) == 0)
592         {
593           /* A blank line or a line that ends in a delimiter has a
594              trailing blank field. */
595           *field = p;
596         }
597       else 
598         return false;
599
600       /* Advance past the field.
601          
602          Also advance past a trailing delimiter, regardless of
603          whether one actually existed.  If we "skip" a delimiter
604          that was not actually there, then we will return
605          end-of-line on our next call, which is what we want. */
606       dfm_forward_columns (dls->reader, ss_length (line) - ss_length (p) + 1);
607     }
608   return true;
609 }
610
611 static bool read_from_data_list_fixed (const struct data_list_pgm *,
612                                        struct ccase *);
613 static bool read_from_data_list_free (const struct data_list_pgm *,
614                                       struct ccase *);
615 static bool read_from_data_list_list (const struct data_list_pgm *,
616                                       struct ccase *);
617
618 /* Reads a case from DLS into C.
619    Returns true if successful, false at end of file or on I/O error. */
620 static bool
621 read_from_data_list (const struct data_list_pgm *dls, struct ccase *c) 
622 {
623   bool retval;
624
625   dfm_push (dls->reader);
626   switch (dls->type)
627     {
628     case DLS_FIXED:
629       retval = read_from_data_list_fixed (dls, c);
630       break;
631     case DLS_FREE:
632       retval = read_from_data_list_free (dls, c);
633       break;
634     case DLS_LIST:
635       retval = read_from_data_list_list (dls, c);
636       break;
637     default:
638       NOT_REACHED ();
639     }
640   dfm_pop (dls->reader);
641
642   return retval;
643 }
644
645 /* Reads a case from the data file into C, parsing it according
646    to fixed-format syntax rules in DLS.  
647    Returns true if successful, false at end of file or on I/O error. */
648 static bool
649 read_from_data_list_fixed (const struct data_list_pgm *dls, struct ccase *c)
650 {
651   struct dls_var_spec *spec;
652   int row;
653
654   if (dfm_eof (dls->reader)) 
655     return false; 
656
657   spec = ll_to_dls_var_spec (ll_head (&dls->specs));
658   for (row = 1; row <= dls->record_cnt; row++)
659     {
660       struct substring line;
661
662       if (dfm_eof (dls->reader))
663         {
664           msg (SW, _("Partial case of %d of %d records discarded."),
665                row - 1, dls->record_cnt);
666           return false;
667         } 
668       dfm_expand_tabs (dls->reader);
669       line = dfm_get_record (dls->reader);
670
671       ll_for_each_continue (spec, struct dls_var_spec, ll, &dls->specs) 
672         {
673           struct data_in di;
674
675           data_in_finite_line (&di, ss_data (line), ss_length (line),
676                                spec->first_column,
677                                spec->first_column + spec->input.w - 1);
678           di.v = case_data_rw (c, spec->fv);
679           di.flags = DI_IMPLIED_DECIMALS;
680           di.f1 = spec->first_column;
681           di.format = spec->input;
682
683           data_in (&di); 
684         }
685
686       dfm_forward_record (dls->reader);
687     }
688
689   return true;
690 }
691
692 /* Reads a case from the data file into C, parsing it according
693    to free-format syntax rules in DLS.  
694    Returns true if successful, false at end of file or on I/O error. */
695 static bool
696 read_from_data_list_free (const struct data_list_pgm *dls, struct ccase *c)
697 {
698   struct dls_var_spec *spec;
699
700   ll_for_each (spec, struct dls_var_spec, ll, &dls->specs)
701     {
702       struct substring field;
703       struct data_in di;
704       
705       /* Cut out a field and read in a new record if necessary. */
706       while (!cut_field (dls, &field))
707         {
708           if (!dfm_eof (dls->reader)) 
709             dfm_forward_record (dls->reader);
710           if (dfm_eof (dls->reader))
711             {
712               if (&spec->ll != ll_head (&dls->specs))
713                 msg (SW, _("Partial case discarded.  The first variable "
714                            "missing was %s."), spec->name);
715               return false;
716             }
717         }
718       
719       di.s = ss_data (field);
720       di.e = ss_end (field);
721       di.v = case_data_rw (c, spec->fv);
722       di.flags = 0;
723       di.f1 = dfm_get_column (dls->reader, ss_data (field));
724       di.format = spec->input;
725       data_in (&di);
726     }
727   return true;
728 }
729
730 /* Reads a case from the data file and parses it according to
731    list-format syntax rules.  
732    Returns true if successful, false at end of file or on I/O error. */
733 static bool
734 read_from_data_list_list (const struct data_list_pgm *dls, struct ccase *c)
735 {
736   struct dls_var_spec *spec;
737
738   if (dfm_eof (dls->reader))
739     return false;
740
741   ll_for_each (spec, struct dls_var_spec, ll, &dls->specs)
742     {
743       struct substring field;
744       struct data_in di;
745
746       if (!cut_field (dls, &field))
747         {
748           if (get_undefined ())
749             msg (SW, _("Missing value(s) for all variables from %s onward.  "
750                        "These will be filled with the system-missing value "
751                        "or blanks, as appropriate."),
752                  spec->name);
753           ll_for_each_continue (spec, struct dls_var_spec, ll, &dls->specs)
754             {
755               int width = fmt_var_width (&spec->input);
756               if (width == 0)
757                 case_data_rw (c, spec->fv)->f = SYSMIS;
758               else
759                 memset (case_data_rw (c, spec->fv)->s, ' ', width); 
760             }
761           break;
762         }
763       
764       di.s = ss_data (field);
765       di.e = ss_end (field);
766       di.v = case_data_rw (c, spec->fv);
767       di.flags = 0;
768       di.f1 = dfm_get_column (dls->reader, ss_data (field));
769       di.format = spec->input;
770       data_in (&di);
771     }
772
773   dfm_forward_record (dls->reader);
774   return true;
775 }
776
777 /* Destroys DATA LIST transformation DLS.
778    Returns true if successful, false if an I/O error occurred. */
779 static bool
780 data_list_trns_free (void *dls_)
781 {
782   struct data_list_pgm *dls = dls_;
783   dfm_close_reader (dls->reader);
784   pool_destroy (dls->pool);
785   return true;
786 }
787
788 /* Handle DATA LIST transformation DLS, parsing data into C. */
789 static int
790 data_list_trns_proc (void *dls_, struct ccase *c, casenumber case_num UNUSED)
791 {
792   struct data_list_pgm *dls = dls_;
793   int retval;
794
795   if (read_from_data_list (dls, c))
796     retval = TRNS_CONTINUE;
797   else if (dfm_reader_error (dls->reader) || dfm_eof (dls->reader) > 1) 
798     {
799       /* An I/O error, or encountering end of file for a second
800          time, should be escalated into a more serious error. */
801       retval = TRNS_ERROR;
802     }
803   else
804     retval = TRNS_END_FILE;
805   
806   /* If there was an END subcommand handle it. */
807   if (dls->end != NULL) 
808     {
809       double *end = &case_data_rw (c, dls->end->fv)->f;
810       if (retval == TRNS_DROP_CASE)
811         {
812           *end = 1.0;
813           retval = TRNS_END_FILE;
814         }
815       else
816         *end = 0.0;
817     }
818
819   return retval;
820 }
821 \f
822 /* Reads all the records from the data file and passes them to
823    write_case().
824    Returns true if successful, false if an I/O error occurred. */
825 static bool
826 data_list_source_read (struct case_source *source,
827                        struct ccase *c,
828                        write_case_func *write_case, write_case_data wc_data)
829 {
830   struct data_list_pgm *dls = source->aux;
831
832   /* Skip the requested number of records before reading the
833      first case. */
834   while (dls->skip_records > 0) 
835     {
836       if (dfm_eof (dls->reader))
837         return false;
838       dfm_forward_record (dls->reader);
839       dls->skip_records--;
840     }
841   
842   for (;;) 
843     {
844       bool ok;
845
846       if (!read_from_data_list (dls, c)) 
847         return !dfm_reader_error (dls->reader);
848
849       dfm_push (dls->reader);
850       ok = write_case (wc_data);
851       dfm_pop (dls->reader);
852       if (!ok)
853         return false;
854     }
855 }
856
857 /* Destroys the source's internal data. */
858 static void
859 data_list_source_destroy (struct case_source *source)
860 {
861   data_list_trns_free (source->aux);
862 }
863
864 static const struct case_source_class data_list_source_class = 
865   {
866     "DATA LIST",
867     NULL,
868     data_list_source_read,
869     data_list_source_destroy,
870   };