X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fprocedure.c;h=9b94da01910e4743c3cc58955d7024e5ef2637b9;hb=b0bf9b1b0f727fafac4296a048e3f45db5936f81;hp=67f22c5c0dfca5a0e0442feb0a8888973bd222a1;hpb=2766c2d47448010527d52dd304213d0bb563dd00;p=pspp-builds.git diff --git a/src/procedure.c b/src/procedure.c index 67f22c5c..9b94da01 100644 --- a/src/procedure.c +++ b/src/procedure.c @@ -35,9 +35,9 @@ #include #include #include +#include #include #include -#include #include #include #include @@ -71,32 +71,40 @@ struct write_case_data size_t cases_analyzed; /* Cases passed to procedure so far. */ }; -/* The current active file, from which cases are read. */ -struct case_source *vfm_source; - -/* The replacement active file, to which cases are written. */ -struct case_sink *vfm_sink; - -/* The compactor used to compact a compact, if necessary; +/* Cases are read from vfm_source, + pass through permanent_trns_chain (which transforms them into + the format described by permanent_dict), + are written to vfm_sink, + pass through temporary_trns_chain (which transforms them into + the format described by default_dict), + and are finally passed to the procedure. */ +static struct case_source *vfm_source; +static struct trns_chain *permanent_trns_chain; +static struct dictionary *permanent_dict; +static struct case_sink *vfm_sink; +static struct trns_chain *temporary_trns_chain; +struct dictionary *default_dict; + +/* The transformation chain that the next transformation will be + added to. */ +static struct trns_chain *cur_trns_chain; + +/* The compactor used to compact a case, if necessary; otherwise a null pointer. */ static struct dict_compactor *compactor; /* Time at which vfm was last invoked. */ static time_t last_vfm_invocation; -/* Whether we're inside a procedure. - For debugging purposes only. */ -static bool in_procedure; - /* Lag queue. */ int n_lag; /* Number of cases to lag. */ static int lag_count; /* Number of cases in lag_queue so far. */ static int lag_head; /* Index where next case will be added. */ static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */ -/* Active transformations. */ -struct transformation *t_trns; -size_t n_trns, m_trns, f_trns; +static void add_case_limit_trns (void); +static void add_filter_trns (void); +static void add_process_if_trns (void); static bool internal_procedure (bool (*proc_func) (struct ccase *, void *), void *aux); @@ -104,11 +112,6 @@ static void update_last_vfm_invocation (void); static void create_trns_case (struct ccase *, struct dictionary *); static void open_active_file (void); static bool write_case (struct write_case_data *wc_data); -static int execute_transformations (struct ccase *c, - struct transformation *trns, - int first_idx, int last_idx, - int case_num); -static int filter_case (const struct ccase *c, int case_num); static void lag_case (const struct ccase *c); static void clear_case (struct ccase *c); static bool close_active_file (void); @@ -126,26 +129,17 @@ time_of_last_procedure (void) /* Reads the data from the input program and writes it to a new active file. For each case we read from the input program, we - do the following + do the following: 1. Execute permanent transformations. If these drop the case, start the next case from step 1. - 2. N OF CASES. If we have already written N cases, start the - next case from step 1. - - 3. Write case to replacement active file. + 2. Write case to replacement active file. - 4. Execute temporary transformations. If these drop the case, + 3. Execute temporary transformations. If these drop the case, start the next case from step 1. - 5. FILTER, PROCESS IF. If these drop the case, start the next - case from step 1. - - 6. Post-TEMPORARY N OF CASES. If we have already analyzed N - cases, start the next case from step 1. - - 7. Pass case to PROC_FUNC, passing AUX as auxiliary data. + 4. Pass case to PROC_FUNC, passing AUX as auxiliary data. Returns true if successful, false if an I/O error occurred. */ bool @@ -154,10 +148,14 @@ procedure (bool (*proc_func) (struct ccase *, void *), void *aux) if (proc_func == NULL && case_source_is_class (vfm_source, &storage_source_class) && vfm_sink == NULL - && !temporary - && n_trns == 0) + && temporary_trns_chain == NULL + && trns_chain_is_empty (permanent_trns_chain)) { - /* Nothing to do. */ + expr_free (process_if_expr); + process_if_expr = NULL; + dict_set_case_limit (default_dict, 0); + dict_clear_vectors (default_dict); + update_last_vfm_invocation (); return true; } @@ -167,8 +165,57 @@ procedure (bool (*proc_func) (struct ccase *, void *), void *aux) open_active_file (); ok = internal_procedure (proc_func, aux); - if (!close_active_file ()) - ok = false; + ok = close_active_file () && ok; + + return ok; + } +} + +/* Callback function for multipass_procedure(). */ +static bool +multipass_callback (struct ccase *c, void *cf_) +{ + struct casefile *cf = cf_; + return casefile_append (cf, c); +} + +/* Procedure that allows multiple passes over the input data. + The entire active file is passed to PROC_FUNC, with the given + AUX as auxiliary data, as a unit. */ +bool +multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux), + void *aux) +{ + if (case_source_is_class (vfm_source, &storage_source_class) + && vfm_sink == NULL + && temporary_trns_chain == NULL + && trns_chain_is_empty (permanent_trns_chain)) + { + proc_func (storage_source_get_casefile (vfm_source), aux); + + expr_free (process_if_expr); + process_if_expr = NULL; + dict_set_case_limit (default_dict, 0); + dict_clear_vectors (default_dict); + + update_last_vfm_invocation (); + return true; + } + else + { + struct casefile *cf; + bool ok; + + assert (proc_func != NULL); + + cf = casefile_create (dict_get_next_value_idx (default_dict)); + + open_active_file (); + ok = internal_procedure (multipass_callback, cf); + ok = proc_func (cf, aux) && ok; + ok = close_active_file () && ok; + + casefile_destroy (cf); return ok; } @@ -237,25 +284,26 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict) static void open_active_file (void) { - assert (!in_procedure); - in_procedure = true; + add_case_limit_trns (); + add_filter_trns (); + add_process_if_trns (); - /* Make temp_dict refer to the dictionary right before data - reaches the sink */ - if (!temporary) - { - temp_trns = n_trns; - temp_dict = default_dict; - } + /* Finalize transformations. */ + trns_chain_finalize (cur_trns_chain); + + /* Make permanent_dict refer to the dictionary right before + data reaches the sink. */ + if (permanent_dict == NULL) + permanent_dict = default_dict; /* Figure out compaction. */ - compactor = (dict_needs_compaction (temp_dict) - ? dict_make_compactor (temp_dict) + compactor = (dict_needs_compaction (permanent_dict) + ? dict_make_compactor (permanent_dict) : NULL); /* Prepare sink. */ if (vfm_sink == NULL) - vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL); + vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL); if (vfm_sink->class->open != NULL) vfm_sink->class->open (vfm_sink); @@ -270,9 +318,6 @@ open_active_file (void) for (i = 0; i < n_lag; i++) case_nullify (&lag_queue[i]); } - - /* Close any unclosed DO IF or LOOP constructs. */ - ctl_stack_clear (); } /* Transforms trns_case and writes it to the replacement active @@ -282,25 +327,22 @@ open_active_file (void) static bool write_case (struct write_case_data *wc_data) { - int retval; + enum trns_result retval; + size_t case_nr; /* Execute permanent transformations. */ - retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns, - temp_trns, wc_data->cases_written + 1); - if (retval != 1) + case_nr = wc_data->cases_written + 1; + retval = trns_chain_execute (permanent_trns_chain, + &wc_data->trns_case, &case_nr); + if (retval != TRNS_CONTINUE) goto done; - /* N OF CASES. */ - if (dict_get_case_limit (default_dict) - && wc_data->cases_written >= dict_get_case_limit (default_dict)) - goto done; - wc_data->cases_written++; - /* Write case to LAG queue. */ if (n_lag) lag_case (&wc_data->trns_case); /* Write case to replacement active file. */ + wc_data->cases_written++; if (vfm_sink->class->write != NULL) { if (compactor != NULL) @@ -314,94 +356,24 @@ write_case (struct write_case_data *wc_data) } /* Execute temporary transformations. */ - retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns, - n_trns, wc_data->cases_written); - if (retval != 1) - goto done; - - /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */ - if (filter_case (&wc_data->trns_case, wc_data->cases_written) - || (dict_get_case_limit (temp_dict) - && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict))) - goto done; - wc_data->cases_analyzed++; + if (temporary_trns_chain != NULL) + { + retval = trns_chain_execute (temporary_trns_chain, + &wc_data->trns_case, + &wc_data->cases_written); + if (retval != TRNS_CONTINUE) + goto done; + } /* Pass case to procedure. */ + wc_data->cases_analyzed++; if (wc_data->proc_func != NULL) if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux)) - retval = -1; + retval = TRNS_ERROR; done: clear_case (&wc_data->trns_case); - return retval != -1; -} - -/* Transforms case C using the transformations in TRNS[] with - indexes FIRST_IDX through LAST_IDX, exclusive. Case C will - become case CASE_NUM (1-based) in the output file. Returns 1 - if the case was successfully transformed, 0 if it was filtered - out by one of the transformations, or -1 if the procedure - should be abandoned due to a fatal error. */ -static int -execute_transformations (struct ccase *c, - struct transformation *trns, - int first_idx, int last_idx, - int case_num) -{ - int idx; - - for (idx = first_idx; idx != last_idx; ) - { - struct transformation *t = &trns[idx]; - int retval = t->proc (t->private, c, case_num); - switch (retval) - { - case TRNS_CONTINUE: - idx++; - break; - - case TRNS_DROP_CASE: - return 0; - - case TRNS_ERROR: - return -1; - - case TRNS_NEXT_CASE: - abort (); - - case TRNS_END_FILE: - abort (); - - default: - idx = retval; - break; - } - } - - return 1; -} - -/* Returns nonzero if case C with case number CASE_NUM should be - excluded as specified on FILTER or PROCESS IF, otherwise - zero. */ -static int -filter_case (const struct ccase *c, int case_idx) -{ - /* FILTER. */ - struct variable *filter_var = dict_get_filter (default_dict); - if (filter_var != NULL) - { - double f = case_num (c, filter_var->fv); - if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f)) - return 1; - } - - /* PROCESS IF. */ - if (process_if_expr != NULL - && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0) - return 1; - - return 0; + return retval != TRNS_ERROR; } /* Add C to the lag queue. */ @@ -452,13 +424,8 @@ close_active_file (void) n_lag = 0; } - /* Dictionary from before TEMPORARY becomes permanent.. */ - if (temporary) - { - dict_destroy (default_dict); - default_dict = temp_dict; - temp_dict = NULL; - } + /* Dictionary from before TEMPORARY becomes permanent. */ + proc_cancel_temporary_transformations (); /* Finish compaction. */ if (compactor != NULL) @@ -479,16 +446,9 @@ close_active_file (void) /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors, and get rid of all the transformations. */ - cancel_temporary (); - expr_free (process_if_expr); - process_if_expr = NULL; - dict_set_case_limit (default_dict, 0); dict_clear_vectors (default_dict); - - assert (in_procedure); - in_procedure = false; - - return cancel_transformations (); + permanent_dict = NULL; + return proc_cancel_all_transformations (); } /* Returns a pointer to the lagged case from N_BEFORE cases before the @@ -509,56 +469,6 @@ lagged_case (int n_before) else return NULL; } - -/* Appends TRNS to t_trns[], the list of all transformations to be - performed on data as it is read from the active file. */ -void -add_transformation (trns_proc_func *proc, trns_free_func *free, void *private) -{ - struct transformation *trns; - - assert (!in_procedure); - - if (n_trns >= m_trns) - t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns); - trns = &t_trns[n_trns++]; - trns->proc = proc; - trns->free = free; - trns->private = private; -} - -/* Returns the index number that the next transformation added by - add_transformation() will receive. A trns_proc_func that - returns this index causes control flow to jump to it. */ -size_t -next_transformation (void) -{ - return n_trns; -} - -/* Cancels all active transformations, including any transformations - created by the input program. - Returns true if successful, false if an I/O error occurred. */ -bool -cancel_transformations (void) -{ - bool ok = true; - size_t i; - for (i = 0; i < n_trns; i++) - { - struct transformation *t = &t_trns[i]; - if (t->free != NULL) - { - if (!t->free (t->private)) - ok = false; - } - } - n_trns = f_trns = 0; - free (t_trns); - t_trns = NULL; - m_trns = 0; - return ok; -} /* Represents auxiliary data for handling SPLIT FILE. */ struct split_aux_data @@ -714,7 +624,7 @@ struct multipass_split_aux_data }; static bool multipass_split_callback (struct ccase *c, void *aux_); -static void multipass_split_output (struct multipass_split_aux_data *); +static bool multipass_split_output (struct multipass_split_aux_data *); /* Returns true if successful, false if an I/O error occurred. */ bool @@ -736,7 +646,7 @@ multipass_procedure_with_splits (bool (*split_func) (const struct casefile *, ok = internal_procedure (multipass_split_callback, &aux); if (aux.casefile != NULL) - multipass_split_output (&aux); + ok = multipass_split_output (&aux) && ok; case_destroy (&aux.prev_case); if (!close_active_file ()) @@ -750,13 +660,14 @@ static bool multipass_split_callback (struct ccase *c, void *aux_) { struct multipass_split_aux_data *aux = aux_; + bool ok = true; /* Start a new series if needed. */ if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case)) { /* Pass any cases to split_func. */ if (aux->casefile != NULL) - multipass_split_output (aux); + ok = multipass_split_output (aux); /* Start a new casefile. */ aux->casefile = casefile_create (dict_get_next_value_idx (default_dict)); @@ -767,18 +678,21 @@ multipass_split_callback (struct ccase *c, void *aux_) case_clone (&aux->prev_case, c); } - return casefile_append (aux->casefile, c); + return casefile_append (aux->casefile, c) && ok; } -static void +static bool multipass_split_output (struct multipass_split_aux_data *aux) { + bool ok; + assert (aux->casefile != NULL); - aux->split_func (aux->casefile, aux->func_aux); + ok = aux->split_func (aux->casefile, aux->func_aux); casefile_destroy (aux->casefile); aux->casefile = NULL; -} + return ok; +} /* Discards all the current state in preparation for a data-input command like DATA LIST or GET. */ @@ -796,12 +710,307 @@ discard_variables (void) vfm_source = NULL; } - cancel_transformations (); - - ctl_stack_clear (); + proc_cancel_all_transformations (); expr_free (process_if_expr); process_if_expr = NULL; - cancel_temporary (); + proc_cancel_temporary_transformations (); +} + +/* Returns the current set of permanent transformations, + and clears the permanent transformations. + For use by INPUT PROGRAM. */ +struct trns_chain * +proc_capture_transformations (void) +{ + struct trns_chain *chain; + + assert (temporary_trns_chain == NULL); + chain = permanent_trns_chain; + cur_trns_chain = permanent_trns_chain = trns_chain_create (); + return chain; +} + +/* Adds a transformation that processes a case with PROC and + frees itself with FREE to the current set of transformations. + The functions are passed AUX as auxiliary data. */ +void +add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux) +{ + trns_chain_append (cur_trns_chain, NULL, proc, free, aux); +} + +/* Adds a transformation that processes a case with PROC and + frees itself with FREE to the current set of transformations. + When parsing of the block of transformations is complete, + FINALIZE will be called. + The functions are passed AUX as auxiliary data. */ +void +add_transformation_with_finalizer (trns_finalize_func *finalize, + trns_proc_func *proc, + trns_free_func *free, void *aux) +{ + trns_chain_append (cur_trns_chain, finalize, proc, free, aux); +} + +/* Returns the index of the next transformation. + This value can be returned by a transformation procedure + function to indicate a "jump" to that transformation. */ +size_t +next_transformation (void) +{ + return trns_chain_next (cur_trns_chain); +} + +/* Returns true if the next call to add_transformation() will add + a temporary transformation, false if it will add a permanent + transformation. */ +bool +proc_in_temporary_transformations (void) +{ + return temporary_trns_chain != NULL; +} + +/* Marks the start of temporary transformations. + Further calls to add_transformation() will add temporary + transformations. */ +void +proc_start_temporary_transformations (void) +{ + if (!proc_in_temporary_transformations ()) + { + add_case_limit_trns (); + + permanent_dict = dict_clone (default_dict); + trns_chain_finalize (permanent_trns_chain); + temporary_trns_chain = cur_trns_chain = trns_chain_create (); + } +} + +/* Converts all the temporary transformations, if any, to + permanent transformations. Further transformations will be + permanent. + Returns true if anything changed, false otherwise. */ +bool +proc_make_temporary_transformations_permanent (void) +{ + if (proc_in_temporary_transformations ()) + { + trns_chain_finalize (temporary_trns_chain); + trns_chain_splice (permanent_trns_chain, temporary_trns_chain); + temporary_trns_chain = NULL; + + dict_destroy (permanent_dict); + permanent_dict = NULL; + + return true; + } + else + return false; +} + +/* Cancels all temporary transformations, if any. Further + transformations will be permanent. + Returns true if anything changed, false otherwise. */ +bool +proc_cancel_temporary_transformations (void) +{ + if (proc_in_temporary_transformations ()) + { + dict_destroy (default_dict); + default_dict = permanent_dict; + permanent_dict = NULL; + + trns_chain_destroy (temporary_trns_chain); + temporary_trns_chain = NULL; + + return true; + } + else + return false; +} + +/* Cancels all transformations, if any. + Returns true if successful, false on I/O error. */ +bool +proc_cancel_all_transformations (void) +{ + bool ok; + ok = trns_chain_destroy (permanent_trns_chain); + ok = trns_chain_destroy (temporary_trns_chain) && ok; + permanent_trns_chain = cur_trns_chain = trns_chain_create (); + temporary_trns_chain = NULL; + return ok; +} + +/* Initializes procedure handling. */ +void +proc_init (void) +{ + default_dict = dict_create (); + proc_cancel_all_transformations (); +} + +/* Finishes up procedure handling. */ +void +proc_done (void) +{ + discard_variables (); +} + +/* Sets SINK as the destination for procedure output from the + next procedure. */ +void +proc_set_sink (struct case_sink *sink) +{ + assert (vfm_sink == NULL); + vfm_sink = sink; +} + +/* Sets SOURCE as the source for procedure input for the next + procedure. */ +void +proc_set_source (struct case_source *source) +{ + assert (vfm_source == NULL); + vfm_source = source; +} + +/* Returns true if a source for the next procedure has been + configured, false otherwise. */ +bool +proc_has_source (void) +{ + return vfm_source != NULL; +} + +/* Returns the output from the previous procedure. + For use only immediately after executing a procedure. + The returned casefile is owned by the caller; it will not be + automatically used for the next procedure's input. */ +struct casefile * +proc_capture_output (void) +{ + struct casefile *casefile; + + /* Try to make sure that this function is called immediately + after procedure() or a similar function. */ + assert (vfm_source != NULL); + assert (case_source_is_class (vfm_source, &storage_source_class)); + assert (trns_chain_is_empty (permanent_trns_chain)); + assert (!proc_in_temporary_transformations ()); + + casefile = storage_source_decapsulate (vfm_source); + vfm_source = NULL; + + return casefile; +} + +static trns_proc_func case_limit_trns_proc; +static trns_free_func case_limit_trns_free; + +/* Adds a transformation that limits the number of cases that may + pass through, if default_dict has a case limit. */ +static void +add_case_limit_trns (void) +{ + size_t case_limit = dict_get_case_limit (default_dict); + if (case_limit != 0) + { + size_t *cases_remaining = xmalloc (sizeof *cases_remaining); + *cases_remaining = case_limit; + add_transformation (case_limit_trns_proc, case_limit_trns_free, + cases_remaining); + dict_set_case_limit (default_dict, 0); + } +} + +/* Limits the maximum number of cases processed to + *CASES_REMAINING. */ +static int +case_limit_trns_proc (void *cases_remaining_, + struct ccase *c UNUSED, int case_nr UNUSED) +{ + size_t *cases_remaining = cases_remaining_; + if (*cases_remaining > 0) + { + *cases_remaining--; + return TRNS_CONTINUE; + } + else + return TRNS_DROP_CASE; +} + +/* Frees the data associated with a case limit transformation. */ +static bool +case_limit_trns_free (void *cases_remaining_) +{ + size_t *cases_remaining = cases_remaining_; + free (cases_remaining); + return true; +} + +static trns_proc_func filter_trns_proc; + +/* Adds a temporary transformation to filter data according to + the variable specified on FILTER, if any. */ +static void +add_filter_trns (void) +{ + struct variable *filter_var = dict_get_filter (default_dict); + if (filter_var != NULL) + { + proc_start_temporary_transformations (); + add_transformation (filter_trns_proc, NULL, filter_var); + } +} + +/* FILTER transformation. */ +static int +filter_trns_proc (void *filter_var_, + struct ccase *c UNUSED, int case_nr UNUSED) + +{ + struct variable *filter_var = filter_var_; + double f = case_num (c, filter_var->fv); + return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f) + ? TRNS_CONTINUE : TRNS_DROP_CASE); +} + +static trns_proc_func process_if_trns_proc; +static trns_free_func process_if_trns_free; + +/* Adds a temporary transformation to filter data according to + the expression specified on PROCESS IF, if any. */ +static void +add_process_if_trns (void) +{ + if (process_if_expr != NULL) + { + proc_start_temporary_transformations (); + add_transformation (process_if_trns_proc, process_if_trns_free, + process_if_expr); + process_if_expr = NULL; + } +} + +/* PROCESS IF transformation. */ +static int +process_if_trns_proc (void *expression_, + struct ccase *c UNUSED, int case_nr UNUSED) + +{ + struct expression *expression = expression_; + return (expr_evaluate_num (expression, c, case_nr) == 1.0 + ? TRNS_CONTINUE : TRNS_DROP_CASE); +} + +/* Frees a PROCESS IF transformation. */ +static bool +process_if_trns_free (void *expression_) +{ + struct expression *expression = expression_; + expr_free (expression); + return true; }