sparse-array: Improve iteration interface.
authorBen Pfaff <blp@gnu.org>
Thu, 7 May 2009 03:22:09 +0000 (20:22 -0700)
committerBen Pfaff <blp@gnu.org>
Sun, 7 Jun 2009 04:11:05 +0000 (21:11 -0700)
The sparse_array_scan function only supports iteration in the forward
direction and its interface is somewhat awkward.  This commit replaces it
by four new functions that allow iteration in both forward and reverse
directions and have a more conventional interface.

Smake
src/data/sparse-cases.c
src/libpspp/sparse-array.c
src/libpspp/sparse-array.h
tests/libpspp/sparse-array-test.c

diff --git a/Smake b/Smake
index f6ab964c014f348828da4a8f6c44fb80abe9e198..e5ff997a876d6384997d2495f1673638a81ffbba 100644 (file)
--- a/Smake
+++ b/Smake
@@ -11,6 +11,7 @@ GNULIB_MODULES = \
        c-ctype \
        c-strtod \
        close \
+       count-one-bits \
        crypto/md4 \
        dirname \
        environ \
index 13fcf7888feae74f2a2489f7ea5df3c678807a71..7abe429147a893af7a48bf88eb5db99cb25313ae 100644 (file)
@@ -80,8 +80,8 @@ sparse_cases_clone (const struct sparse_cases *old)
       struct ccase **cp;
 
       new->memory = sparse_array_create (sizeof (struct ccase *));
-      for (cp = sparse_array_scan (old->memory, NULL, &idx); cp != NULL;
-           cp = sparse_array_scan (old->memory, &idx, &idx))
+      for (cp = sparse_array_first (old->memory, &idx); cp != NULL;
+           cp = sparse_array_next (old->memory, idx, &idx))
         {
           struct ccase **ncp = sparse_array_insert (new->memory, idx);
           *ncp = case_ref (*cp);
@@ -133,8 +133,8 @@ sparse_cases_destroy (struct sparse_cases *sc)
         {
           unsigned long int idx;
           struct ccase **cp;
-          for (cp = sparse_array_scan (sc->memory, NULL, &idx); cp != NULL;
-               cp = sparse_array_scan (sc->memory, &idx, &idx))
+          for (cp = sparse_array_first (sc->memory, &idx); cp != NULL;
+               cp = sparse_array_next (sc->memory, idx, &idx))
             case_unref (*cp);
           sparse_array_destroy (sc->memory);
         }
@@ -167,8 +167,8 @@ dump_sparse_cases_to_disk (struct sparse_cases *sc)
   sc->disk = case_tmpfile_create (sc->column_cnt);
   sc->disk_cases = range_set_create ();
 
-  for (cp = sparse_array_scan (sc->memory, NULL, &idx); cp != NULL;
-       cp = sparse_array_scan (sc->memory, &idx, &idx))
+  for (cp = sparse_array_first (sc->memory, &idx); cp != NULL;
+       cp = sparse_array_next (sc->memory, idx, &idx))
     {
       if (!case_tmpfile_put_case (sc->disk, idx, *cp))
         {
@@ -324,8 +324,8 @@ sparse_cases_write_columns (struct sparse_cases *sc, size_t start_column,
       struct ccase **cp;
       unsigned long int idx;
 
-      for (cp = sparse_array_scan (sc->memory, NULL, &idx); cp != NULL;
-           cp = sparse_array_scan (sc->memory, &idx, &idx))
+      for (cp = sparse_array_first (sc->memory, &idx); cp != NULL;
+           cp = sparse_array_next (sc->memory, idx, &idx))
         {
           *cp = case_unshare (*cp);
           case_copy_in (*cp, start_column, values, value_cnt);
index 6471bbacf44a83ad919477c35d5c7b6b2ed1a37d..ebc81ff45137734736069abf6eaaa0d13d4b0fae 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2007 Free Software Foundation, Inc.
+   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -25,6 +25,9 @@
 #include <libpspp/misc.h>
 #include <libpspp/pool.h>
 
+#include "count-one-bits.h"
+#include "minmax.h"
+
 /* Sparse array data structure.
 
    The sparse array is implemented in terms of a "radix tree", a
@@ -48,6 +51,7 @@
 
 /* Maximum height. */
 #define LONG_BITS (sizeof (unsigned long int) * CHAR_BIT)
+#define LONG_MASK ((unsigned long int) LONG_BITS - 1)
 #define MAX_HEIGHT DIV_RND_UP (LONG_BITS, BITS_PER_LEVEL)
 
 /* Bit-mask for index. */
@@ -113,7 +117,6 @@ static inline bool is_in_use (const struct leaf_node *, unsigned int);
 static inline bool any_in_use (const struct leaf_node *);
 static inline void set_in_use (struct leaf_node *, unsigned int);
 static inline void unset_in_use (struct leaf_node *, unsigned int);
-static inline int scan_in_use (struct leaf_node *, unsigned int);
 static inline void *leaf_element (const struct sparse_array *,
                                   struct leaf_node *, unsigned int);
 static inline size_t leaf_size (const struct sparse_array *);
@@ -121,10 +124,20 @@ static inline size_t leaf_size (const struct sparse_array *);
 static struct leaf_node *find_leaf_node (const struct sparse_array *,
                                          unsigned long int);
 static void decrease_height (struct sparse_array *);
-static void *scan_leaf (struct sparse_array *, struct leaf_node *,
-                        unsigned long int, unsigned long int *);
-static void *do_scan (struct sparse_array *, union pointer *, int,
-                      unsigned long int, unsigned long int *);
+
+static int scan_in_use_forward (struct leaf_node *, unsigned int);
+static void *do_scan_forward (struct sparse_array *, union pointer *, int,
+                              unsigned long int, unsigned long int *);
+static void *scan_forward (const struct sparse_array *,
+                           unsigned long int start,
+                           unsigned long int *found);
+
+static int scan_in_use_reverse (struct leaf_node *, unsigned int);
+static void *do_scan_reverse (struct sparse_array *, union pointer *, int,
+                              unsigned long int, unsigned long int *);
+static void *scan_reverse (const struct sparse_array *,
+                           unsigned long int start,
+                           unsigned long int *found);
 \f
 /* Creates and returns a new sparse array that will contain
    elements that are ELEM_SIZE bytes in size. */
@@ -355,46 +368,46 @@ sparse_array_remove (struct sparse_array *spar, unsigned long int key)
   return true;
 }
 
-/* Scans SPAR in increasing order of keys for in-use elements.
-   If SKIP is NULL, the scan starts from key 0;
-   otherwise, it starts just after key *SKIP.
-   If an element is found, returns it and stores the element's
-   key into *FOUND; otherwise, returns a null pointer and does
-   not modify *FOUND. */
+/* Returns a pointer to the in-use element with the smallest
+   index and sets *IDXP to its index or, if SPAR has no in-use
+   elements, returns NULL without modifying *IDXP. */
 void *
-sparse_array_scan (const struct sparse_array *spar_, unsigned long int *skip,
-                   unsigned long int *found)
+sparse_array_first (const struct sparse_array *spar, unsigned long int *idxp)
 {
-  struct sparse_array *spar = (struct sparse_array *) spar_;
-  unsigned long int start;
+  return scan_forward (spar, 0, idxp);
+}
 
-  /* Find our starting point. */
-  if (skip != NULL)
-    {
-      start = *skip + 1;
-      if (start == 0)
-        return NULL;
-    }
-  else
-    start = 0;
+/* Returns a pointer to the in-use element with the smallest
+   index greater than SKIP and sets *IDXP to its index or, if
+   SPAR has no in-use elements with index greater than SKIP,
+   returns NULL without modifying *IDXP. */
+void *
+sparse_array_next (const struct sparse_array *spar, unsigned long int skip,
+                   unsigned long int *idxp)
+{
+  return skip < ULONG_MAX ? scan_forward (spar, skip + 1, idxp) : NULL;
+}
 
-  /* Check the cache. */
-  if (start >> BITS_PER_LEVEL == spar->cache_ofs)
-    {
-      void *p = scan_leaf (spar, spar->cache, start, found);
-      if (p)
-        return p;
-      start &= ~LEVEL_MASK;
-      start += PTRS_PER_LEVEL;
-      if (start == 0)
-        return NULL;
-    }
+/* Returns a pointer to the in-use element with the greatest
+   index and sets *IDXP to its index or, if SPAR has no in-use
+   elements, returns NULL without modifying *IDXP. */
+void *
+sparse_array_last (const struct sparse_array *spar, unsigned long int *idxp)
+{
+  return scan_reverse (spar, ULONG_MAX, idxp);
+}
 
-  /* Do the scan. */
-  if (!index_in_range (spar, start))
-    return NULL;
-  return do_scan (spar, &spar->root, spar->height - 1, start, found);
+/* Returns a pointer to the in-use element with the greatest
+   index less than SKIP and sets *IDXP to its index or, if SPAR
+   has no in-use elements with index less than SKIP, returns NULL
+   without modifying *IDXP. */
+void *
+sparse_array_prev (const struct sparse_array *spar, unsigned long int skip,
+                   unsigned long int *idxp)
+{
+  return skip > 0 ? scan_reverse (spar, skip - 1, idxp) : NULL;
 }
+
 \f
 /* Returns true iff KEY is in the range of keys currently
    represented by SPAR. */
@@ -476,21 +489,65 @@ count_trailing_zeros (unsigned long int x)
 }
 
 /* Returns the least index of the in-use element in LEAF greater
-   than or equal to IDX. */
-static inline int
-scan_in_use (struct leaf_node *leaf, unsigned int idx)
+   than or equal to IDX.  Bits in IDX not in LEVEL_MASK are
+   considered to have value 0. */
+static int
+scan_in_use_forward (struct leaf_node *leaf, unsigned int idx)
 {
-  for (; idx < PTRS_PER_LEVEL; idx = (idx & ~(LONG_BITS - 1)) + LONG_BITS)
+  idx &= LEVEL_MASK;
+  for (; idx < PTRS_PER_LEVEL; idx = (idx & ~LONG_MASK) + LONG_BITS)
     {
       int ofs = idx % LONG_BITS;
       unsigned long int in_use = leaf->in_use[idx / LONG_BITS] >> ofs;
       if (!in_use)
         continue;
-      return count_trailing_zeros (in_use) + idx;
+      return idx + count_trailing_zeros (in_use);
     }
   return -1;
 }
 
+/* Returns the number of leading 0-bits in X.
+   Undefined if X is zero. */
+static inline int
+count_leading_zeros (unsigned long int x)
+{
+#if __GNUC__ >= 4
+  return __builtin_clzl (x);
+#else
+  x |= x >> 1;
+  x |= x >> 2;
+  x |= x >> 4;
+  x |= x >> 8;
+  x |= x >> 16;
+#if ULONG_MAX >> 31 >> 1
+  x |= x >> 32;
+#endif
+#if ULONG_MAX >> 31 >> 31 >> 2
+  x |= x >> 64;
+#endif
+  return LONG_BITS - count_one_bits_l (x);
+#endif
+}
+
+/* Returns the greatest index of the in-use element in LEAF less
+   than or equal to IDX.  Bits in IDX not in LEVEL_MASK are
+   considered to have value 0. */
+static int
+scan_in_use_reverse (struct leaf_node *leaf, unsigned int idx)
+{
+  idx &= LEVEL_MASK;
+  for (;;)
+    {
+      int ofs = idx % LONG_BITS;
+      unsigned long int in_use = leaf->in_use[idx / LONG_BITS] << (31 - ofs);
+      if (in_use)
+        return idx - count_leading_zeros (in_use);
+      if (idx < LONG_BITS)
+        return -1;
+      idx = (idx | LONG_MASK) - LONG_BITS;
+    }
+}
+
 /* Returns the address of element with the given KEY in LEAF,
    which is a node in SPAR. */
 static inline void *
@@ -501,6 +558,18 @@ leaf_element (const struct sparse_array *spar, struct leaf_node *leaf,
   return (char *) leaf + SIZEOF_ALIGNED (*leaf) + (spar->elem_size * key);
 }
 
+/* As leaf_element, returns the address of element with the given
+   KEY in LEAF, which is a node in SPAR.  Also, updates SPAR's
+   cache to contain LEAF. */
+static void *
+cache_leaf_element (struct sparse_array *spar, struct leaf_node *leaf,
+                    unsigned long int key)
+{
+  spar->cache = leaf;
+  spar->cache_ofs = key >> BITS_PER_LEVEL;
+  return leaf_element (spar, leaf, key);
+}
+
 /* Returns the size of a leaf node in SPAR. */
 static inline size_t
 leaf_size (const struct sparse_array *spar)
@@ -557,24 +626,36 @@ decrease_height (struct sparse_array *spar)
     }
 }
 
-/* Scans leaf node LEAF, which is in SPAR, for the in-use element
-   with the least key greater than or equal to START.  If such an
-   element is found, returns a pointer to it and stores its key
-   in *FOUND; otherwise, returns a null pointer and does not
-   modify *FOUND. */
-static void *
-scan_leaf (struct sparse_array *spar, struct leaf_node *leaf,
-           unsigned long int start, unsigned long int *found)
+/* Scans P, which is at LEVEL and within SPAR, and its subnodes,
+   for the in-use element with the least key greater than or
+   equal to START.  If such an element is found, returns a
+   pointer to it and stores its key in *FOUND; otherwise, returns
+   a null pointer and does not modify *FOUND. */
+static inline void *
+scan_internal_node_forward (struct sparse_array *spar,
+                            struct internal_node *node,
+                            int level, unsigned long int start,
+                            unsigned long int *found)
 {
-  int idx = scan_in_use (leaf, start & LEVEL_MASK);
-  if (idx >= 0)
+  int shift = level * BITS_PER_LEVEL;
+  int count = node->count;
+  int i;
+
+  for (i = (start >> shift) & LEVEL_MASK; i < PTRS_PER_LEVEL; i++)
     {
-      *found = (start & ~LEVEL_MASK) | idx;
-      spar->cache = leaf;
-      spar->cache_ofs = *found >> BITS_PER_LEVEL;
-      return leaf_element (spar, leaf, idx);
-    }
+      union pointer *q = &node->down[i];
+      if (level > 1 ? q->internal != NULL : q->leaf != NULL)
+        {
+          void *element = do_scan_forward (spar, q, level - 1, start, found);
+          if (element)
+            return element;
+          if (--count == 0)
+            return NULL;
+        }
 
+      start &= ~((1ul << shift) - 1);
+      start += 1ul << shift;
+    }
   return NULL;
 }
 
@@ -583,44 +664,134 @@ scan_leaf (struct sparse_array *spar, struct leaf_node *leaf,
    equal to START.  If such an element is found, returns a
    pointer to it and stores its key in *FOUND; otherwise, returns
    a null pointer and does not modify *FOUND. */
+static void *
+do_scan_forward (struct sparse_array *spar, union pointer *p, int level,
+                 unsigned long int start, unsigned long int *found)
+{
+  if (level == 0)
+    {
+      int idx = scan_in_use_forward (p->leaf, start);
+      if (idx >= 0)
+        {
+          unsigned long int key = *found = (start & ~LEVEL_MASK) | idx;
+          return cache_leaf_element (spar, p->leaf, key);
+        }
+    }
+  return scan_internal_node_forward (spar, p->internal, level, start, found);
+}
+
+static void *
+scan_forward (const struct sparse_array *spar_, unsigned long int start,
+              unsigned long int *found)
+{
+  struct sparse_array *spar = (struct sparse_array *) spar_;
+
+  /* Check the cache. */
+  if (start >> BITS_PER_LEVEL == spar->cache_ofs)
+    {
+      int idx = scan_in_use_forward (spar->cache, start);
+      if (idx >= 0)
+        {
+          *found = (start & ~LEVEL_MASK) | idx;
+          return leaf_element (spar, spar->cache, idx);
+        }
+      start = (start & ~LEVEL_MASK) + PTRS_PER_LEVEL;
+      if (start == 0)
+        return NULL;
+    }
+
+  /* Do the scan. */
+  if (!index_in_range (spar, start))
+    return NULL;
+  return do_scan_forward (spar, &spar->root, spar->height - 1, start, found);
+}
+
+/* Scans P, which is at LEVEL and within SPAR, and its subnodes,
+   for the in-use element with the greatest key less than or
+   equal to START.  If such an element is found, returns a
+   pointer to it and stores its key in *FOUND; otherwise, returns
+   a null pointer and does not modify *FOUND. */
 static inline void *
-scan_internal_node (struct sparse_array *spar, struct internal_node *node,
-                    int level, unsigned long int start,
-                    unsigned long int *found)
+scan_internal_node_reverse (struct sparse_array *spar,
+                            struct internal_node *node,
+                            int level, unsigned long int start,
+                            unsigned long int *found)
 {
   int shift = level * BITS_PER_LEVEL;
   int count = node->count;
   int i;
 
-  for (i = (start >> shift) & LEVEL_MASK; i < PTRS_PER_LEVEL; i++)
+  for (i = (start >> shift) & LEVEL_MASK; i >= 0; i--)
     {
       union pointer *q = &node->down[i];
       if (level > 1 ? q->internal != NULL : q->leaf != NULL)
         {
-          void *element = do_scan (spar, q, level - 1, start, found);
+          void *element = do_scan_reverse (spar, q, level - 1, start, found);
           if (element)
             return element;
           if (--count == 0)
             return NULL;
         }
 
-      start &= ~((1ul << shift) - 1);
-      start += 1ul << shift;
+      start |= (1ul << shift) - 1;
+      start -= 1ul << shift;
     }
   return NULL;
 }
 
 /* Scans P, which is at LEVEL and within SPAR, and its subnodes,
-   for the in-use element with the least key greater than or
+   for the in-use element with the greatest key less than or
    equal to START.  If such an element is found, returns a
    pointer to it and stores its key in *FOUND; otherwise, returns
    a null pointer and does not modify *FOUND. */
 static void *
-do_scan (struct sparse_array *spar, union pointer *p, int level,
-         unsigned long int start, unsigned long int *found)
+do_scan_reverse (struct sparse_array *spar, union pointer *p, int level,
+                 unsigned long int start, unsigned long int *found)
 {
-  return (level == 0
-          ? scan_leaf (spar, p->leaf, start, found)
-          : scan_internal_node (spar, p->internal, level, start, found));
+  if (level == 0)
+    {
+      int idx = scan_in_use_reverse (p->leaf, start);
+      if (idx >= 0)
+        {
+          unsigned long int key = *found = (start & ~LEVEL_MASK) | idx;
+          return cache_leaf_element (spar, p->leaf, key);
+        }
+      else
+        return NULL;
+    }
+  return scan_internal_node_reverse (spar, p->internal, level, start, found);
 }
 
+static void *
+scan_reverse (const struct sparse_array *spar_, unsigned long int start,
+              unsigned long int *found)
+{
+  struct sparse_array *spar = (struct sparse_array *) spar_;
+
+  /* Check the cache. */
+  if (start >> BITS_PER_LEVEL == spar->cache_ofs)
+    {
+      int idx = scan_in_use_reverse (spar->cache, start);
+      if (idx >= 0)
+        {
+          *found = (start & ~LEVEL_MASK) | idx;
+          return leaf_element (spar, spar->cache, idx);
+        }
+      start |= LEVEL_MASK;
+      if (start < PTRS_PER_LEVEL)
+        return NULL;
+      start -= PTRS_PER_LEVEL;
+    }
+  else
+    {
+      if (spar->height == 0)
+        return NULL;
+      else if (spar->height < MAX_HEIGHT)
+        {
+          unsigned long int max_key;
+          max_key = (1ul << (spar->height * BITS_PER_LEVEL)) - 1;
+          start = MIN (start, max_key);
+        }
+    }
+  return do_scan_reverse (spar, &spar->root, spar->height - 1, start, found);
+}
index e8be2e182bc37fa48fd72964ef52c9d4be58db4b..56360cabc0c47c4f11eb52f17af8fe91b5f088a1 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2007 Free Software Foundation, Inc.
+   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -49,8 +49,13 @@ void *sparse_array_insert (struct sparse_array *, unsigned long int key);
 void *sparse_array_get (const struct sparse_array *, unsigned long int key);
 bool sparse_array_remove (struct sparse_array *, unsigned long int key);
 
-void *sparse_array_scan (const struct sparse_array *,
-                         unsigned long int *skip,
-                         unsigned long int *key);
+void *sparse_array_first (const struct sparse_array *,
+                          unsigned long int *idxp);
+void *sparse_array_next (const struct sparse_array *,
+                         unsigned long int skip, unsigned long int *idxp);
+void *sparse_array_last (const struct sparse_array *,
+                          unsigned long int *idxp);
+void *sparse_array_prev (const struct sparse_array *,
+                         unsigned long int skip, unsigned long int *idxp);
 
 #endif /* libpspp/sparse-array.h */
index c8d14b2bb98f51a95bdf84955cd1b64cc4c87155..e146c3dbf537b7c765dfec60a16267756f8b4356 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2007 Free Software Foundation, Inc.
+   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -18,7 +18,8 @@
    in sparse-array.c.  This test program aims to be as
    comprehensive as possible.  "gcov -b" should report 100%
    coverage of lines and branches in sparse-array.c when compiled
-   with -DNDEBUG.  "valgrind --leak-check=yes
+   with -DNDEBUG and BITS_PER_LEVEL is greater than the number of
+   bits in a long.  "valgrind --leak-check=yes
    --show-reachable=yes" should give a clean report. */
 
 #ifdef HAVE_CONFIG_H
@@ -164,8 +165,8 @@ check_sparse_array (struct sparse_array *spar,
       check (!sparse_array_remove (spar, order[cnt - 1] + 1));
     }
 
-  for (i = 0, p = sparse_array_scan (spar, NULL, &idx); i < cnt;
-       i++, p = sparse_array_scan (spar, &idx, &idx))
+  for (i = 0, p = sparse_array_first (spar, &idx); i < cnt;
+       i++, p = sparse_array_next (spar, idx, &idx))
     {
       check (p != NULL);
       check (idx == order[i]);
@@ -173,6 +174,15 @@ check_sparse_array (struct sparse_array *spar,
     }
   check (p == NULL);
 
+  for (i = 0, p = sparse_array_last (spar, &idx); i < cnt;
+       i++, p = sparse_array_prev (spar, idx, &idx))
+    {
+      check (p != NULL);
+      check (idx == order[cnt - i - 1]);
+      check (*p == order[cnt - i - 1]);
+    }
+  check (p == NULL);
+
   free (order);
 }