datasheet: Avoid reading a given row more than necessary in source_read(). 20130625010510/pspp 20130626010503/pspp
authorBen Pfaff <blp@cs.stanford.edu>
Tue, 25 Jun 2013 06:07:12 +0000 (23:07 -0700)
committerBen Pfaff <blp@cs.stanford.edu>
Tue, 25 Jun 2013 06:07:12 +0000 (23:07 -0700)
The datasheet casereader uses datasheet_get_row(), which calls rw_case(),
which until this commit called source_read() for every column.  The latter
was expensive if casereader_peek() actually copied data for a whole large
row.  For a datasheet with N columns, all from a single backing source,
the upshot was that datasheet_get_row() took O(N**2) time.

This commit fixes the problem in the common case by reading as many columns
as possible from a given source in a single call to source_read().

Bug #33260.

src/data/datasheet.c

index 54d268fa064100ba28c040c757df11388bb7976c..2847703d1e8f432556ae70a428a167a6ed56599b 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2007, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
+   Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -80,9 +80,10 @@ static int source_allocate_column (struct source *, int width);
 static void source_release_column (struct source *, int ofs, int width);
 static bool source_in_use (const struct source *);
 
-static bool source_read (const struct column *, casenumber row, union value *);
+static bool source_read (const struct column *, casenumber row, union value *,
+                         size_t n);
 static bool source_write (const struct column *, casenumber row,
-                          const union value *);
+                          const union value *, size_t n);
 static bool source_write_column (struct column *, const union value *);
 static bool source_has_backing (const struct source *);
 
@@ -506,13 +507,13 @@ datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
       for (lrow = 0; lrow < n_rows; lrow++)
         {
           unsigned long int prow = axis_map (ds->rows, lrow);
-          if (!source_read (&old_col, prow, &src))
+          if (!source_read (&old_col, prow, &src, 1))
             {
               /* FIXME: back out col changes. */
               break;
             }
           resize_cb (&src, &dst, resize_cb_aux);
-          if (!source_write (col, prow, &dst))
+          if (!source_write (col, prow, &dst, 1))
             {
               /* FIXME: back out col changes. */
               break;
@@ -809,6 +810,7 @@ rw_case (struct datasheet *ds, enum rw_op op,
          casenumber lrow, size_t start_column, size_t n_columns,
          union value data[])
 {
+  struct column *columns = &ds->columns[start_column];
   casenumber prow;
   size_t i;
 
@@ -817,24 +819,34 @@ rw_case (struct datasheet *ds, enum rw_op op,
   assert (start_column + n_columns <= datasheet_get_n_columns (ds));
 
   prow = axis_map (ds->rows, lrow);
-  for (i = 0; i < n_columns; i++)
+  for (i = 0; i < n_columns; )
     {
-      struct column *c = &ds->columns[start_column + i];
-      if (c->width >= 0)
+      struct source *source = columns[i].source;
+      size_t j;
+      bool ok;
+
+      if (columns[i].width < 0)
         {
-          bool ok;
+          i++;
+          continue;
+        }
 
-          if (op == OP_READ)
-            ok = source_read (c, prow, &data[i]);
-          else
-            ok = source_write (c, prow, &data[i]);
+      for (j = i + 1; j < n_columns; j++)
+        if (columns[j].width < 0 || columns[j].source != source)
+          break;
 
-          if (!ok)
-            {
-              taint_set_taint (ds->taint);
-              return false;
-            }
+      if (op == OP_READ)
+        ok = source_read (&columns[i], prow, &data[i], j - i);
+      else
+        ok = source_write (&columns[i], prow, &data[i], j - i);
+
+      if (!ok)
+        {
+          taint_set_taint (ds->taint);
+          return false;
         }
+
+      i = j;
     }
   return true;
 }
@@ -1339,30 +1351,39 @@ source_get_backing_n_rows (const struct source *source)
   return source->backing_rows;
 }
 
-/* Reads the given COLUMN from SOURCE in the given ROW, into
-   VALUE.  Returns true if successful, false on I/O error.
+/* Reads the N COLUMNS in the given ROW, into the N VALUES.  Returns true if
+   successful, false on I/O error.
 
-   The caller must have initialized VALUE with the proper
-   width. */
+   All of the COLUMNS must have the same source.
+
+   The caller must have initialized VALUES with the proper width. */
 static bool
-source_read (const struct column *column, casenumber row, union value *value)
+source_read (const struct column columns[], casenumber row,
+             union value values[], size_t n)
 {
-  struct source *source = column->source;
+  struct source *source = columns[0].source;
+  size_t i;
 
-  assert (column->width >= 0);
   if (source->backing == NULL
       || sparse_xarray_contains_row (source->data, row))
-    return sparse_xarray_read (source->data, row, column->byte_ofs,
-                               width_to_n_bytes (column->width),
-                               value_to_data (value, column->width));
+    {
+      bool ok = true;
+
+      for (i = 0; i < n && ok; i++)
+        ok = sparse_xarray_read (source->data, row, columns[i].byte_ofs,
+                                 width_to_n_bytes (columns[i].width),
+                                 value_to_data (&values[i], columns[i].width));
+      return ok;
+    }
   else
     {
       struct ccase *c = casereader_peek (source->backing, row);
       bool ok = c != NULL;
       if (ok)
         {
-          value_copy (value, case_data_idx (c, column->value_ofs),
-                      column->width);
+          for (i = 0; i < n; i++)
+            value_copy (&values[i], case_data_idx (c, columns[i].value_ofs),
+                        columns[i].width);
           case_unref (c);
         }
       return ok;
@@ -1394,18 +1415,20 @@ copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
   return true;
 }
 
-/* Writes VALUE to SOURCE in the given ROW and COLUMN.  Returns
-   true if successful, false on I/O error.  On error, the row's
-   data may be completely or partially corrupted, both inside and
-   outside the region to be written.  */
+/* Writes the N VALUES to their source in the given ROW and COLUMNS.  Returns
+   true if successful, false on I/O error.  On error, the row's data may be
+   completely or partially corrupted, both inside and outside the region to be
+   written.
+
+   All of the COLUMNS must have the same source. */
 static bool
-source_write (const struct column *column, casenumber row,
-              const union value *value)
+source_write (const struct column columns[], casenumber row,
+              const union value values[], size_t n)
 {
-  struct source *source = column->source;
+  struct source *source = columns[0].source;
   struct casereader *backing = source->backing;
+  size_t i;
 
-  assert (column->width >= 0);
   if (backing != NULL
       && !sparse_xarray_contains_row (source->data, row)
       && row < source->backing_rows)
@@ -1423,9 +1446,12 @@ source_write (const struct column *column, casenumber row,
         return false;
     }
 
-  return sparse_xarray_write (source->data, row, column->byte_ofs,
-                              width_to_n_bytes (column->width),
-                              value_to_data (value, column->width));
+  for (i = 0; i < n; i++)
+    if (!sparse_xarray_write (source->data, row, columns[i].byte_ofs,
+                              width_to_n_bytes (columns[i].width),
+                              value_to_data (&values[i], columns[i].width)))
+      return false;
+  return true;
 }
 
 /* Within SOURCE, which must not have a backing casereader,