Instead of making system or portable file readers responsible for
[pspp-builds.git] / src / aggregate.c
index ec601dbffdba49806bb2d2c4c62482a1ae084d41..ffd29166142cb2bd44512ebb80cb17d2b935f7f6 100644 (file)
@@ -24,6 +24,7 @@
 #include "case.h"
 #include "casefile.h"
 #include "command.h"
+#include "dictionary.h"
 #include "error.h"
 #include "file-handle.h"
 #include "lexer.h"
@@ -31,7 +32,7 @@
 #include "moments.h"
 #include "pool.h"
 #include "settings.h"
-#include "sfm.h"
+#include "sfm-write.h"
 #include "sort.h"
 #include "str.h"
 #include "var.h"
@@ -116,7 +117,7 @@ enum missing_treatment
 struct agr_proc 
   {
     /* We have either an output file or a sink. */
-    struct file_handle *out_file;       /* Output file, or null if none. */
+    struct sfm_writer *writer;          /* Output file, or null if none. */
     struct case_sink *sink;             /* Sink, or null if none. */
 
     /* Break variables. */
@@ -130,7 +131,6 @@ struct agr_proc
     struct dictionary *dict;            /* Aggregate dictionary. */
     int case_cnt;                       /* Counts aggregated cases. */
     struct ccase agr_case;              /* Aggregate case for output. */
-    flt64 *sfm_agr_case;                /* Aggregate case in SFM format. */
   };
 
 static void initialize_aggregate_info (struct agr_proc *);
@@ -142,13 +142,11 @@ static int aggregate_single_case (struct agr_proc *agr,
                                   const struct ccase *input,
                                   struct ccase *output);
 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
-static int create_sysfile (struct agr_proc *);
 
 /* Aggregating to the active file. */
 static int agr_to_active_file (struct ccase *, void *aux);
 
 /* Aggregating to a system file. */
-static void write_case_to_sfm (struct agr_proc *agr);
 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
 \f
 /* Parsing. */
@@ -158,11 +156,12 @@ int
 cmd_aggregate (void)
 {
   struct agr_proc agr;
+  struct file_handle *out_file = NULL;
 
   /* Have we seen these subcommands? */
   unsigned seen = 0;
 
-  agr.out_file = NULL;
+  agr.writer = NULL;
   agr.sink = NULL;
   agr.missing = ITEMWISE;
   agr.sort = NULL;
@@ -186,18 +185,16 @@ cmd_aggregate (void)
          if (seen & 1)
            {
              msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
-              goto lossage;
+              goto error;
            }
          seen |= 1;
              
          lex_match ('=');
-         if (lex_match ('*'))
-           agr.out_file = NULL;
-         else 
+         if (!lex_match ('*'))
            {
-             agr.out_file = fh_parse_file_handle ();
-             if (agr.out_file == NULL)
-                goto lossage;
+             out_file = fh_parse ();
+             if (out_file == NULL)
+                goto error;
            }
        }
       else if (lex_match_id ("MISSING"))
@@ -206,7 +203,7 @@ cmd_aggregate (void)
          if (!lex_match_id ("COLUMNWISE"))
            {
              lex_error (_("while expecting COLUMNWISE"));
-              goto lossage;
+              goto error;
            }
          agr.missing = COLUMNWISE;
        }
@@ -221,7 +218,7 @@ cmd_aggregate (void)
          if (seen & 8)
            {
              msg (SE, _("%s subcommand given multiple times."),"BREAK");
-              goto lossage;
+              goto error;
            }
          seen |= 8;
 
@@ -229,7 +226,7 @@ cmd_aggregate (void)
           agr.sort = sort_parse_criteria (default_dict,
                                           &agr.break_vars, &agr.break_var_cnt);
           if (agr.sort == NULL)
-            goto lossage;
+            goto error;
          
           for (i = 0; i < agr.break_var_cnt; i++)
             {
@@ -247,7 +244,7 @@ cmd_aggregate (void)
       
   /* Read in the aggregate functions. */
   if (!parse_aggregate_functions (&agr))
-    goto lossage;
+    goto error;
 
   /* Delete documents. */
   if (!(seen & 2))
@@ -262,7 +259,7 @@ cmd_aggregate (void)
   initialize_aggregate_info (&agr);
 
   /* Output to active file or external file? */
-  if (agr.out_file == NULL) 
+  if (out_file == NULL) 
     {
       /* The active file will be replaced by the aggregated data,
          so TEMPORARY is moot. */
@@ -289,9 +286,10 @@ cmd_aggregate (void)
     }
   else
     {
-      if (!create_sysfile (&agr))
-        goto lossage;
-
+      agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression ());
+      if (agr.writer == NULL)
+        goto error;
+      
       if (agr.sort != NULL && (seen & 4) == 0) 
         {
           /* Sorting is needed. */
@@ -301,12 +299,12 @@ cmd_aggregate (void)
           
           dst = sort_active_file_to_casefile (agr.sort);
           if (dst == NULL)
-            goto lossage;
+            goto error;
           reader = casefile_get_destructive_reader (dst);
           while (casereader_read_xfer (reader, &c)) 
             {
               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
-                write_case_to_sfm (&agr);
+                sfm_write_case (agr.writer, &agr.agr_case);
               case_destroy (&c);
             }
           casereader_destroy (reader);
@@ -321,36 +319,18 @@ cmd_aggregate (void)
       if (agr.case_cnt > 0) 
         {
           dump_aggregate_info (&agr, &agr.agr_case);
-          write_case_to_sfm (&agr);
+          sfm_write_case (agr.writer, &agr.agr_case);
         }
-      fh_close_handle (agr.out_file);
     }
   
   agr_destroy (&agr);
   return CMD_SUCCESS;
 
-lossage:
+error:
   agr_destroy (&agr);
   return CMD_FAILURE;
 }
 
-/* Create a system file for use in aggregation to an external
-   file. */
-static int
-create_sysfile (struct agr_proc *agr)
-{
-  struct sfm_write_info w;
-  w.h = agr->out_file;
-  w.dict = agr->dict;
-  w.compress = get_scompression();
-  if (!sfm_write_dictionary (&w))
-    return 0;
-
-  agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
-    
-  return 1;
-}
-
 /* Parse all the aggregate functions. */
 static int
 parse_aggregate_functions (struct agr_proc *agr)
@@ -390,8 +370,9 @@ parse_aggregate_functions (struct agr_proc *agr)
        {
          int n_dest_prev = n_dest;
          
-         if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
-           goto lossage;
+         if (!parse_DATA_LIST_vars (&dest, &n_dest,
+                                     PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
+           goto error;
 
          /* Assign empty labels. */
          {
@@ -414,7 +395,7 @@ parse_aggregate_functions (struct agr_proc *agr)
       if (token != T_ID)
        {
          lex_error (_("expecting aggregation function"));
-         goto lossage;
+         goto error;
        }
 
       include_missing = 0;
@@ -430,7 +411,7 @@ parse_aggregate_functions (struct agr_proc *agr)
       if (NULL == function->name)
        {
          msg (SE, _("Unknown aggregation function %s."), tokid);
-         goto lossage;
+         goto error;
        }
       func_index = function - agr_func_tab;
       lex_get ();
@@ -445,7 +426,7 @@ parse_aggregate_functions (struct agr_proc *agr)
          else
            {
              lex_error (_("expecting `('"));
-             goto lossage;
+             goto error;
            }
        } else {
          /* Parse list of source variables. */
@@ -458,7 +439,7 @@ parse_aggregate_functions (struct agr_proc *agr)
              pv_opts |= PV_SAME_TYPE;
 
            if (!parse_variables (default_dict, &src, &n_src, pv_opts))
-             goto lossage;
+             goto error;
          }
 
          /* Parse function arguments, for those functions that
@@ -480,7 +461,7 @@ parse_aggregate_functions (struct agr_proc *agr)
                    type = NUMERIC;
                  } else {
                    msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
-                   goto lossage;
+                   goto error;
                  }
            
                lex_get ();
@@ -490,7 +471,7 @@ parse_aggregate_functions (struct agr_proc *agr)
                    msg (SE, _("Arguments to %s must be of same type as "
                               "source variables."),
                         function->name);
-                   goto lossage;
+                   goto error;
                  }
              }
 
@@ -498,7 +479,7 @@ parse_aggregate_functions (struct agr_proc *agr)
          if (!lex_match(')'))
            {
              lex_error (_("expecting `)'"));
-             goto lossage;
+             goto error;
            }
          
          /* Now check that the number of source variables match the
@@ -512,7 +493,7 @@ parse_aggregate_functions (struct agr_proc *agr)
              msg (SE, _("Number of source variables (%d) does not match "
                         "number of target variables (%d)."),
                   n_src, n_dest);
-             goto lossage;
+             goto error;
            }
        }
        
@@ -584,7 +565,7 @@ parse_aggregate_functions (struct agr_proc *agr)
                           "variables."),
                     dest[i]);
                free (dest[i]);
-               goto lossage;
+               goto error;
              }
 
            free (dest[i]);
@@ -636,7 +617,7 @@ parse_aggregate_functions (struct agr_proc *agr)
        }
       continue;
       
-    lossage:
+    error:
       for (i = 0; i < n_dest; i++)
        {
          free (dest[i]);
@@ -664,11 +645,11 @@ agr_destroy (struct agr_proc *agr)
 {
   struct agr_var *iter, *next;
 
-  if (agr->dict != NULL)
-    dict_destroy (agr->dict);
+  sfm_close_writer (agr->writer);
   if (agr->sort != NULL)
     sort_destroy_criteria (agr->sort);
   free (agr->break_vars);
+  free (agr->prev_break);
   for (iter = agr->agr_vars; iter; iter = next)
     {
       next = iter->next;
@@ -687,7 +668,8 @@ agr_destroy (struct agr_proc *agr)
         moments1_destroy (iter->moments);
       free (iter);
     }
-  free (agr->prev_break);
+  if (agr->dict != NULL)
+    dict_destroy (agr->dict);
   case_destroy (&agr->agr_case);
 }
 \f
@@ -1143,38 +1125,6 @@ agr_to_active_file (struct ccase *c, void *agr_)
   return 1;
 }
 
-/* Writes AGR->agr_case to AGR->out_file. */
-static void
-write_case_to_sfm (struct agr_proc *agr)
-{
-  flt64 *p;
-  int i;
-
-  p = agr->sfm_agr_case;
-  for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
-    {
-      struct variable *v = dict_get_var (agr->dict, i);
-      
-      if (v->type == NUMERIC)
-       {
-         double src = case_num (&agr->agr_case, v->fv);
-         if (src == SYSMIS)
-           *p++ = -FLT64_MAX;
-         else
-           *p++ = src;
-       }
-      else
-       {
-         memcpy (p, case_str (&agr->agr_case, v->fv), v->width);
-         memset (&((char *) p)[v->width], ' ',
-                 REM_RND_UP (v->width, sizeof (flt64)));
-         p += DIV_RND_UP (v->width, sizeof (flt64));
-       }
-    }
-
-  sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
-}
-
 /* Aggregate the current case and output it if we passed a
    breakpoint. */
 static int
@@ -1183,7 +1133,7 @@ presorted_agr_to_sysfile (struct ccase *c, void *agr_)
   struct agr_proc *agr = agr_;
 
   if (aggregate_single_case (agr, c, &agr->agr_case)) 
-    write_case_to_sfm (agr);
+    sfm_write_case (agr->writer, &agr->agr_case);
 
   return 1;
 }