From a68e3f51236286fe02c6c4b6d40efa1bcec36349 Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Wed, 6 May 2009 20:22:09 -0700 Subject: [PATCH] sparse-array: Improve iteration interface. The sparse_array_scan function only supports iteration in the forward direction and its interface is somewhat awkward. This commit replaces it by four new functions that allow iteration in both forward and reverse directions and have a more conventional interface. --- Smake | 1 + src/data/sparse-cases.c | 16 +- src/libpspp/sparse-array.c | 317 +++++++++++++++++++++++------- src/libpspp/sparse-array.h | 13 +- tests/libpspp/sparse-array-test.c | 18 +- 5 files changed, 276 insertions(+), 89 deletions(-) diff --git a/Smake b/Smake index f6ab964c..e5ff997a 100644 --- a/Smake +++ b/Smake @@ -11,6 +11,7 @@ GNULIB_MODULES = \ c-ctype \ c-strtod \ close \ + count-one-bits \ crypto/md4 \ dirname \ environ \ diff --git a/src/data/sparse-cases.c b/src/data/sparse-cases.c index 13fcf788..7abe4291 100644 --- a/src/data/sparse-cases.c +++ b/src/data/sparse-cases.c @@ -80,8 +80,8 @@ sparse_cases_clone (const struct sparse_cases *old) struct ccase **cp; new->memory = sparse_array_create (sizeof (struct ccase *)); - for (cp = sparse_array_scan (old->memory, NULL, &idx); cp != NULL; - cp = sparse_array_scan (old->memory, &idx, &idx)) + for (cp = sparse_array_first (old->memory, &idx); cp != NULL; + cp = sparse_array_next (old->memory, idx, &idx)) { struct ccase **ncp = sparse_array_insert (new->memory, idx); *ncp = case_ref (*cp); @@ -133,8 +133,8 @@ sparse_cases_destroy (struct sparse_cases *sc) { unsigned long int idx; struct ccase **cp; - for (cp = sparse_array_scan (sc->memory, NULL, &idx); cp != NULL; - cp = sparse_array_scan (sc->memory, &idx, &idx)) + for (cp = sparse_array_first (sc->memory, &idx); cp != NULL; + cp = sparse_array_next (sc->memory, idx, &idx)) case_unref (*cp); sparse_array_destroy (sc->memory); } @@ -167,8 +167,8 @@ dump_sparse_cases_to_disk (struct sparse_cases *sc) sc->disk = case_tmpfile_create (sc->column_cnt); sc->disk_cases = range_set_create (); - for (cp = sparse_array_scan (sc->memory, NULL, &idx); cp != NULL; - cp = sparse_array_scan (sc->memory, &idx, &idx)) + for (cp = sparse_array_first (sc->memory, &idx); cp != NULL; + cp = sparse_array_next (sc->memory, idx, &idx)) { if (!case_tmpfile_put_case (sc->disk, idx, *cp)) { @@ -324,8 +324,8 @@ sparse_cases_write_columns (struct sparse_cases *sc, size_t start_column, struct ccase **cp; unsigned long int idx; - for (cp = sparse_array_scan (sc->memory, NULL, &idx); cp != NULL; - cp = sparse_array_scan (sc->memory, &idx, &idx)) + for (cp = sparse_array_first (sc->memory, &idx); cp != NULL; + cp = sparse_array_next (sc->memory, idx, &idx)) { *cp = case_unshare (*cp); case_copy_in (*cp, start_column, values, value_cnt); diff --git a/src/libpspp/sparse-array.c b/src/libpspp/sparse-array.c index 6471bbac..ebc81ff4 100644 --- a/src/libpspp/sparse-array.c +++ b/src/libpspp/sparse-array.c @@ -1,5 +1,5 @@ /* PSPP - a program for statistical analysis. - Copyright (C) 2007 Free Software Foundation, Inc. + Copyright (C) 2007, 2009 Free Software Foundation, Inc. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,6 +25,9 @@ #include #include +#include "count-one-bits.h" +#include "minmax.h" + /* Sparse array data structure. The sparse array is implemented in terms of a "radix tree", a @@ -48,6 +51,7 @@ /* Maximum height. */ #define LONG_BITS (sizeof (unsigned long int) * CHAR_BIT) +#define LONG_MASK ((unsigned long int) LONG_BITS - 1) #define MAX_HEIGHT DIV_RND_UP (LONG_BITS, BITS_PER_LEVEL) /* Bit-mask for index. */ @@ -113,7 +117,6 @@ static inline bool is_in_use (const struct leaf_node *, unsigned int); static inline bool any_in_use (const struct leaf_node *); static inline void set_in_use (struct leaf_node *, unsigned int); static inline void unset_in_use (struct leaf_node *, unsigned int); -static inline int scan_in_use (struct leaf_node *, unsigned int); static inline void *leaf_element (const struct sparse_array *, struct leaf_node *, unsigned int); static inline size_t leaf_size (const struct sparse_array *); @@ -121,10 +124,20 @@ static inline size_t leaf_size (const struct sparse_array *); static struct leaf_node *find_leaf_node (const struct sparse_array *, unsigned long int); static void decrease_height (struct sparse_array *); -static void *scan_leaf (struct sparse_array *, struct leaf_node *, - unsigned long int, unsigned long int *); -static void *do_scan (struct sparse_array *, union pointer *, int, - unsigned long int, unsigned long int *); + +static int scan_in_use_forward (struct leaf_node *, unsigned int); +static void *do_scan_forward (struct sparse_array *, union pointer *, int, + unsigned long int, unsigned long int *); +static void *scan_forward (const struct sparse_array *, + unsigned long int start, + unsigned long int *found); + +static int scan_in_use_reverse (struct leaf_node *, unsigned int); +static void *do_scan_reverse (struct sparse_array *, union pointer *, int, + unsigned long int, unsigned long int *); +static void *scan_reverse (const struct sparse_array *, + unsigned long int start, + unsigned long int *found); /* Creates and returns a new sparse array that will contain elements that are ELEM_SIZE bytes in size. */ @@ -355,46 +368,46 @@ sparse_array_remove (struct sparse_array *spar, unsigned long int key) return true; } -/* Scans SPAR in increasing order of keys for in-use elements. - If SKIP is NULL, the scan starts from key 0; - otherwise, it starts just after key *SKIP. - If an element is found, returns it and stores the element's - key into *FOUND; otherwise, returns a null pointer and does - not modify *FOUND. */ +/* Returns a pointer to the in-use element with the smallest + index and sets *IDXP to its index or, if SPAR has no in-use + elements, returns NULL without modifying *IDXP. */ void * -sparse_array_scan (const struct sparse_array *spar_, unsigned long int *skip, - unsigned long int *found) +sparse_array_first (const struct sparse_array *spar, unsigned long int *idxp) { - struct sparse_array *spar = (struct sparse_array *) spar_; - unsigned long int start; + return scan_forward (spar, 0, idxp); +} - /* Find our starting point. */ - if (skip != NULL) - { - start = *skip + 1; - if (start == 0) - return NULL; - } - else - start = 0; +/* Returns a pointer to the in-use element with the smallest + index greater than SKIP and sets *IDXP to its index or, if + SPAR has no in-use elements with index greater than SKIP, + returns NULL without modifying *IDXP. */ +void * +sparse_array_next (const struct sparse_array *spar, unsigned long int skip, + unsigned long int *idxp) +{ + return skip < ULONG_MAX ? scan_forward (spar, skip + 1, idxp) : NULL; +} - /* Check the cache. */ - if (start >> BITS_PER_LEVEL == spar->cache_ofs) - { - void *p = scan_leaf (spar, spar->cache, start, found); - if (p) - return p; - start &= ~LEVEL_MASK; - start += PTRS_PER_LEVEL; - if (start == 0) - return NULL; - } +/* Returns a pointer to the in-use element with the greatest + index and sets *IDXP to its index or, if SPAR has no in-use + elements, returns NULL without modifying *IDXP. */ +void * +sparse_array_last (const struct sparse_array *spar, unsigned long int *idxp) +{ + return scan_reverse (spar, ULONG_MAX, idxp); +} - /* Do the scan. */ - if (!index_in_range (spar, start)) - return NULL; - return do_scan (spar, &spar->root, spar->height - 1, start, found); +/* Returns a pointer to the in-use element with the greatest + index less than SKIP and sets *IDXP to its index or, if SPAR + has no in-use elements with index less than SKIP, returns NULL + without modifying *IDXP. */ +void * +sparse_array_prev (const struct sparse_array *spar, unsigned long int skip, + unsigned long int *idxp) +{ + return skip > 0 ? scan_reverse (spar, skip - 1, idxp) : NULL; } + /* Returns true iff KEY is in the range of keys currently represented by SPAR. */ @@ -476,21 +489,65 @@ count_trailing_zeros (unsigned long int x) } /* Returns the least index of the in-use element in LEAF greater - than or equal to IDX. */ -static inline int -scan_in_use (struct leaf_node *leaf, unsigned int idx) + than or equal to IDX. Bits in IDX not in LEVEL_MASK are + considered to have value 0. */ +static int +scan_in_use_forward (struct leaf_node *leaf, unsigned int idx) { - for (; idx < PTRS_PER_LEVEL; idx = (idx & ~(LONG_BITS - 1)) + LONG_BITS) + idx &= LEVEL_MASK; + for (; idx < PTRS_PER_LEVEL; idx = (idx & ~LONG_MASK) + LONG_BITS) { int ofs = idx % LONG_BITS; unsigned long int in_use = leaf->in_use[idx / LONG_BITS] >> ofs; if (!in_use) continue; - return count_trailing_zeros (in_use) + idx; + return idx + count_trailing_zeros (in_use); } return -1; } +/* Returns the number of leading 0-bits in X. + Undefined if X is zero. */ +static inline int +count_leading_zeros (unsigned long int x) +{ +#if __GNUC__ >= 4 + return __builtin_clzl (x); +#else + x |= x >> 1; + x |= x >> 2; + x |= x >> 4; + x |= x >> 8; + x |= x >> 16; +#if ULONG_MAX >> 31 >> 1 + x |= x >> 32; +#endif +#if ULONG_MAX >> 31 >> 31 >> 2 + x |= x >> 64; +#endif + return LONG_BITS - count_one_bits_l (x); +#endif +} + +/* Returns the greatest index of the in-use element in LEAF less + than or equal to IDX. Bits in IDX not in LEVEL_MASK are + considered to have value 0. */ +static int +scan_in_use_reverse (struct leaf_node *leaf, unsigned int idx) +{ + idx &= LEVEL_MASK; + for (;;) + { + int ofs = idx % LONG_BITS; + unsigned long int in_use = leaf->in_use[idx / LONG_BITS] << (31 - ofs); + if (in_use) + return idx - count_leading_zeros (in_use); + if (idx < LONG_BITS) + return -1; + idx = (idx | LONG_MASK) - LONG_BITS; + } +} + /* Returns the address of element with the given KEY in LEAF, which is a node in SPAR. */ static inline void * @@ -501,6 +558,18 @@ leaf_element (const struct sparse_array *spar, struct leaf_node *leaf, return (char *) leaf + SIZEOF_ALIGNED (*leaf) + (spar->elem_size * key); } +/* As leaf_element, returns the address of element with the given + KEY in LEAF, which is a node in SPAR. Also, updates SPAR's + cache to contain LEAF. */ +static void * +cache_leaf_element (struct sparse_array *spar, struct leaf_node *leaf, + unsigned long int key) +{ + spar->cache = leaf; + spar->cache_ofs = key >> BITS_PER_LEVEL; + return leaf_element (spar, leaf, key); +} + /* Returns the size of a leaf node in SPAR. */ static inline size_t leaf_size (const struct sparse_array *spar) @@ -557,24 +626,36 @@ decrease_height (struct sparse_array *spar) } } -/* Scans leaf node LEAF, which is in SPAR, for the in-use element - with the least key greater than or equal to START. If such an - element is found, returns a pointer to it and stores its key - in *FOUND; otherwise, returns a null pointer and does not - modify *FOUND. */ -static void * -scan_leaf (struct sparse_array *spar, struct leaf_node *leaf, - unsigned long int start, unsigned long int *found) +/* Scans P, which is at LEVEL and within SPAR, and its subnodes, + for the in-use element with the least key greater than or + equal to START. If such an element is found, returns a + pointer to it and stores its key in *FOUND; otherwise, returns + a null pointer and does not modify *FOUND. */ +static inline void * +scan_internal_node_forward (struct sparse_array *spar, + struct internal_node *node, + int level, unsigned long int start, + unsigned long int *found) { - int idx = scan_in_use (leaf, start & LEVEL_MASK); - if (idx >= 0) + int shift = level * BITS_PER_LEVEL; + int count = node->count; + int i; + + for (i = (start >> shift) & LEVEL_MASK; i < PTRS_PER_LEVEL; i++) { - *found = (start & ~LEVEL_MASK) | idx; - spar->cache = leaf; - spar->cache_ofs = *found >> BITS_PER_LEVEL; - return leaf_element (spar, leaf, idx); - } + union pointer *q = &node->down[i]; + if (level > 1 ? q->internal != NULL : q->leaf != NULL) + { + void *element = do_scan_forward (spar, q, level - 1, start, found); + if (element) + return element; + if (--count == 0) + return NULL; + } + start &= ~((1ul << shift) - 1); + start += 1ul << shift; + } return NULL; } @@ -583,44 +664,134 @@ scan_leaf (struct sparse_array *spar, struct leaf_node *leaf, equal to START. If such an element is found, returns a pointer to it and stores its key in *FOUND; otherwise, returns a null pointer and does not modify *FOUND. */ +static void * +do_scan_forward (struct sparse_array *spar, union pointer *p, int level, + unsigned long int start, unsigned long int *found) +{ + if (level == 0) + { + int idx = scan_in_use_forward (p->leaf, start); + if (idx >= 0) + { + unsigned long int key = *found = (start & ~LEVEL_MASK) | idx; + return cache_leaf_element (spar, p->leaf, key); + } + } + return scan_internal_node_forward (spar, p->internal, level, start, found); +} + +static void * +scan_forward (const struct sparse_array *spar_, unsigned long int start, + unsigned long int *found) +{ + struct sparse_array *spar = (struct sparse_array *) spar_; + + /* Check the cache. */ + if (start >> BITS_PER_LEVEL == spar->cache_ofs) + { + int idx = scan_in_use_forward (spar->cache, start); + if (idx >= 0) + { + *found = (start & ~LEVEL_MASK) | idx; + return leaf_element (spar, spar->cache, idx); + } + start = (start & ~LEVEL_MASK) + PTRS_PER_LEVEL; + if (start == 0) + return NULL; + } + + /* Do the scan. */ + if (!index_in_range (spar, start)) + return NULL; + return do_scan_forward (spar, &spar->root, spar->height - 1, start, found); +} + +/* Scans P, which is at LEVEL and within SPAR, and its subnodes, + for the in-use element with the greatest key less than or + equal to START. If such an element is found, returns a + pointer to it and stores its key in *FOUND; otherwise, returns + a null pointer and does not modify *FOUND. */ static inline void * -scan_internal_node (struct sparse_array *spar, struct internal_node *node, - int level, unsigned long int start, - unsigned long int *found) +scan_internal_node_reverse (struct sparse_array *spar, + struct internal_node *node, + int level, unsigned long int start, + unsigned long int *found) { int shift = level * BITS_PER_LEVEL; int count = node->count; int i; - for (i = (start >> shift) & LEVEL_MASK; i < PTRS_PER_LEVEL; i++) + for (i = (start >> shift) & LEVEL_MASK; i >= 0; i--) { union pointer *q = &node->down[i]; if (level > 1 ? q->internal != NULL : q->leaf != NULL) { - void *element = do_scan (spar, q, level - 1, start, found); + void *element = do_scan_reverse (spar, q, level - 1, start, found); if (element) return element; if (--count == 0) return NULL; } - start &= ~((1ul << shift) - 1); - start += 1ul << shift; + start |= (1ul << shift) - 1; + start -= 1ul << shift; } return NULL; } /* Scans P, which is at LEVEL and within SPAR, and its subnodes, - for the in-use element with the least key greater than or + for the in-use element with the greatest key less than or equal to START. If such an element is found, returns a pointer to it and stores its key in *FOUND; otherwise, returns a null pointer and does not modify *FOUND. */ static void * -do_scan (struct sparse_array *spar, union pointer *p, int level, - unsigned long int start, unsigned long int *found) +do_scan_reverse (struct sparse_array *spar, union pointer *p, int level, + unsigned long int start, unsigned long int *found) { - return (level == 0 - ? scan_leaf (spar, p->leaf, start, found) - : scan_internal_node (spar, p->internal, level, start, found)); + if (level == 0) + { + int idx = scan_in_use_reverse (p->leaf, start); + if (idx >= 0) + { + unsigned long int key = *found = (start & ~LEVEL_MASK) | idx; + return cache_leaf_element (spar, p->leaf, key); + } + else + return NULL; + } + return scan_internal_node_reverse (spar, p->internal, level, start, found); } +static void * +scan_reverse (const struct sparse_array *spar_, unsigned long int start, + unsigned long int *found) +{ + struct sparse_array *spar = (struct sparse_array *) spar_; + + /* Check the cache. */ + if (start >> BITS_PER_LEVEL == spar->cache_ofs) + { + int idx = scan_in_use_reverse (spar->cache, start); + if (idx >= 0) + { + *found = (start & ~LEVEL_MASK) | idx; + return leaf_element (spar, spar->cache, idx); + } + start |= LEVEL_MASK; + if (start < PTRS_PER_LEVEL) + return NULL; + start -= PTRS_PER_LEVEL; + } + else + { + if (spar->height == 0) + return NULL; + else if (spar->height < MAX_HEIGHT) + { + unsigned long int max_key; + max_key = (1ul << (spar->height * BITS_PER_LEVEL)) - 1; + start = MIN (start, max_key); + } + } + return do_scan_reverse (spar, &spar->root, spar->height - 1, start, found); +} diff --git a/src/libpspp/sparse-array.h b/src/libpspp/sparse-array.h index e8be2e18..56360cab 100644 --- a/src/libpspp/sparse-array.h +++ b/src/libpspp/sparse-array.h @@ -1,5 +1,5 @@ /* PSPP - a program for statistical analysis. - Copyright (C) 2007 Free Software Foundation, Inc. + Copyright (C) 2007, 2009 Free Software Foundation, Inc. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -49,8 +49,13 @@ void *sparse_array_insert (struct sparse_array *, unsigned long int key); void *sparse_array_get (const struct sparse_array *, unsigned long int key); bool sparse_array_remove (struct sparse_array *, unsigned long int key); -void *sparse_array_scan (const struct sparse_array *, - unsigned long int *skip, - unsigned long int *key); +void *sparse_array_first (const struct sparse_array *, + unsigned long int *idxp); +void *sparse_array_next (const struct sparse_array *, + unsigned long int skip, unsigned long int *idxp); +void *sparse_array_last (const struct sparse_array *, + unsigned long int *idxp); +void *sparse_array_prev (const struct sparse_array *, + unsigned long int skip, unsigned long int *idxp); #endif /* libpspp/sparse-array.h */ diff --git a/tests/libpspp/sparse-array-test.c b/tests/libpspp/sparse-array-test.c index c8d14b2b..e146c3db 100644 --- a/tests/libpspp/sparse-array-test.c +++ b/tests/libpspp/sparse-array-test.c @@ -1,5 +1,5 @@ /* PSPP - a program for statistical analysis. - Copyright (C) 2007 Free Software Foundation, Inc. + Copyright (C) 2007, 2009 Free Software Foundation, Inc. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -18,7 +18,8 @@ in sparse-array.c. This test program aims to be as comprehensive as possible. "gcov -b" should report 100% coverage of lines and branches in sparse-array.c when compiled - with -DNDEBUG. "valgrind --leak-check=yes + with -DNDEBUG and BITS_PER_LEVEL is greater than the number of + bits in a long. "valgrind --leak-check=yes --show-reachable=yes" should give a clean report. */ #ifdef HAVE_CONFIG_H @@ -164,8 +165,8 @@ check_sparse_array (struct sparse_array *spar, check (!sparse_array_remove (spar, order[cnt - 1] + 1)); } - for (i = 0, p = sparse_array_scan (spar, NULL, &idx); i < cnt; - i++, p = sparse_array_scan (spar, &idx, &idx)) + for (i = 0, p = sparse_array_first (spar, &idx); i < cnt; + i++, p = sparse_array_next (spar, idx, &idx)) { check (p != NULL); check (idx == order[i]); @@ -173,6 +174,15 @@ check_sparse_array (struct sparse_array *spar, } check (p == NULL); + for (i = 0, p = sparse_array_last (spar, &idx); i < cnt; + i++, p = sparse_array_prev (spar, idx, &idx)) + { + check (p != NULL); + check (idx == order[cnt - i - 1]); + check (*p == order[cnt - i - 1]); + } + check (p == NULL); + free (order); } -- 2.30.2