work on fully reading data
authorBen Pfaff <blp@cs.stanford.edu>
Sat, 5 Jul 2025 22:40:22 +0000 (15:40 -0700)
committerBen Pfaff <blp@cs.stanford.edu>
Sat, 5 Jul 2025 22:40:22 +0000 (15:40 -0700)
rust/pspp/src/sys/raw.rs

index d2681c6e2b88a627dc575f3950c0d1023683cc32..86f2971af11f897bd6e52907468fa06604c0a0ef 100644 (file)
@@ -766,105 +766,145 @@ impl RawDatum {
             VarType::Numeric => Datum::Number(endian.parse(raw.0)),
         }
     }
+}
 
+impl Datum {
     fn read_case<R: Read + Seek>(
         reader: &mut R,
-        var_types: &VarTypes,
+        case_vars: &[CaseVar],
         endian: Endian,
     ) -> Result<Option<Vec<Self>>, Error> {
+        fn eof<R: Seek>(
+            reader: &mut R,
+            case_vars: &[CaseVar],
+            case_start: u64,
+        ) -> Result<Option<Vec<Datum>>, Error> {
+            let offset = reader.stream_position()?;
+            if offset == case_start {
+                Ok(None)
+            } else {
+                Err(Error::EofInCase {
+                    offset,
+                    case_ofs: offset - case_start,
+                    case_len: case_vars.iter().map(CaseVar::bytes).sum(),
+                })
+            }
+        }
+
         let case_start = reader.stream_position()?;
-        let mut values = Vec::with_capacity(var_types.n_values());
-        for (i, var_type) in var_types.iter().enumerate() {
-            let Some(raw) = try_read_bytes(reader)? else {
-                if i == 0 {
-                    return Ok(None);
-                } else {
-                    let offset = reader.stream_position()?;
-                    return Err(Error::EofInCase {
-                        offset,
-                        case_ofs: offset - case_start,
-                        case_len: var_types.n_values() * 8,
-                    });
+        let mut values = Vec::with_capacity(case_vars.len());
+        for var in case_vars {
+            match var {
+                CaseVar::Numeric => {
+                    let Some(raw) = try_read_bytes(reader)? else {
+                        return eof(reader, case_vars, case_start);
+                    };
+                    values.push(Datum::Number(endian.parse(raw)));
                 }
-            };
-            values.push(Datum::from_raw(&UntypedDatum(raw), var_type, endian));
+                CaseVar::String { width, encoding } => {
+                    let mut datum = vec![0; *width];
+                    let mut offset = 0;
+                    for segment in encoding {
+                        if !try_read_bytes_into(
+                            reader,
+                            &mut datum[offset..offset + segment.data_bytes],
+                        )? {
+                            return eof(reader, case_vars, case_start);
+                        }
+                        skip_bytes(reader, segment.padding_bytes)?;
+                        offset += segment.data_bytes;
+                    }
+                    values.push(Datum::String(RawString(datum)));
+                }
+            }
         }
         Ok(Some(values))
     }
 
+    fn read_compressed_chunk<R: Read>(
+        reader: &mut R,
+        codes: &mut VecDeque<u8>,
+        endian: Endian,
+        bias: f64,
+    ) -> Result<Option<[u8; 8]>, Error> {
+        loop {
+            match codes.pop_front() {
+                Some(0) => (),
+                Some(252) => return Ok(None),
+                Some(253) => return Ok(Some(read_bytes(reader)?)),
+                Some(254) => return Ok(Some([b' '; 8])),
+                Some(255) => return Ok(Some(endian.to_bytes(-f64::MAX))),
+                Some(code) => return Ok(Some(endian.to_bytes(code as f64 - bias))),
+                None => {
+                    match try_read_bytes::<8, _>(reader)? {
+                        Some(new_codes) => codes.extend(new_codes.into_iter()),
+                        None => return Ok(None),
+                    };
+                }
+            };
+        }
+    }
     fn read_compressed_case<R: Read + Seek>(
         reader: &mut R,
-        var_types: &VarTypes,
+        case_vars: &[CaseVar],
         codes: &mut VecDeque<u8>,
         endian: Endian,
         bias: f64,
     ) -> Result<Option<Vec<Self>>, Error> {
+        fn eof<R: Seek>(
+            reader: &mut R,
+            case_vars: &[CaseVar],
+            case_start: u64,
+        ) -> Result<Option<Vec<Datum>>, Error> {
+            let offset = reader.stream_position()?;
+            if offset == case_start {
+                Ok(None)
+            } else {
+                Err(Error::EofInCase {
+                    offset,
+                    case_ofs: offset - case_start,
+                    case_len: case_vars.iter().map(CaseVar::bytes).sum(),
+                })
+            }
+        }
+
         let case_start = reader.stream_position()?;
-        let mut values = Vec::with_capacity(var_types.n_values());
-        for (i, var_type) in var_types.iter().enumerate() {
-            let value = loop {
-                let Some(code) = codes.pop_front() else {
-                    let Some(new_codes): Option<[u8; 8]> = try_read_bytes(reader)? else {
-                        if i == 0 {
-                            return Ok(None);
-                        } else {
-                            let offset = reader.stream_position()?;
-                            return Err(Error::EofInCompressedCase {
-                                offset,
-                                case_ofs: offset - case_start,
-                            });
-                        }
+        let mut values = Vec::with_capacity(case_vars.len());
+        for var in case_vars {
+            match var {
+                CaseVar::Numeric => {
+                    let Some(raw) = Self::read_compressed_chunk(reader, codes, endian, bias)?
+                    else {
+                        return eof(reader, case_vars, case_start);
                     };
-                    codes.extend(new_codes.into_iter());
-                    continue;
-                };
-                match code {
-                    0 => (),
-                    1..=251 => match var_type {
-                        VarType::Numeric => break Self::Number(Some(code as f64 - bias)),
-                        VarType::String => {
-                            break Self::String(RawStrArray(endian.to_bytes(code as f64 - bias)))
-                        }
-                    },
-                    252 => {
-                        if i == 0 {
-                            return Ok(None);
-                        } else {
-                            let offset = reader.stream_position()?;
-                            return Err(Error::PartialCompressedCase {
-                                offset,
-                                case_ofs: offset - case_start,
-                            });
+                    values.push(Datum::Number(endian.parse(raw)));
+                }
+                CaseVar::String { width, encoding } => {
+                    let mut datum = vec![0; *width];
+                    for segment in encoding {
+                        let mut data_bytes = segment.data_bytes;
+                        let mut padding_bytes = segment.padding_bytes;
+                        while data_bytes > 0 || padding_bytes > 0 {
+                            let Some(raw) =
+                                Self::read_compressed_chunk(reader, codes, endian, bias)?
+                            else {
+                                return eof(reader, case_vars, case_start);
+                            };
+                            let n_data = data_bytes.min(8);
+                            datum.extend_from_slice(&raw[..n_data]);
+                            data_bytes -= n_data;
+                            padding_bytes -= 8 - n_data;
                         }
                     }
-                    253 => {
-                        break Self::from_raw(&UntypedDatum(read_bytes(reader)?), var_type, endian)
-                    }
-                    254 => match var_type {
-                        VarType::String => break Self::String(RawStrArray(*b"        ")), // XXX EBCDIC
-                        VarType::Numeric => {
-                            return Err(Error::CompressedStringExpected {
-                                offset: case_start,
-                                case_ofs: reader.stream_position()? - case_start,
-                            })
-                        }
-                    },
-                    255 => match var_type {
-                        VarType::Numeric => break Self::Number(None),
-                        VarType::String => {
-                            return Err(Error::CompressedNumberExpected {
-                                offset: case_start,
-                                case_ofs: reader.stream_position()? - case_start,
-                            })
-                        }
-                    },
+                    values.push(Datum::String(RawString(datum)));
                 }
-            };
-            values.push(value);
+            }
         }
         Ok(Some(values))
     }
+}
 
+impl RawDatum {
     pub fn decode(&self, width: VarWidth) -> Datum {
         match self {
             Self::Number(x) => Datum::Number(*x),
@@ -1062,11 +1102,13 @@ struct StringSegment {
 
 fn segment_widths(width: usize) -> impl Iterator<Item = usize> {
     let n_segments = width.div_ceil(252);
-    repeat_n(255, n_segments - 1).chain(if n_segments > 1 {
-        std::iter::once(width - (n_segments - 1) * 252)
-    } else {
-        std::iter::once(width)
-    })
+    repeat_n(255, n_segments - 1)
+        .chain(if n_segments > 1 {
+            std::iter::once(width - (n_segments - 1) * 252)
+        } else {
+            std::iter::once(width)
+        })
+        .map(|w| w.next_multiple_of(8))
 }
 
 enum CaseVar {
@@ -1083,25 +1125,40 @@ impl CaseVar {
             VarWidth::Numeric => Self::Numeric,
             VarWidth::String(width) => {
                 let width = width as usize;
-                let mut segments = SmallVec::<[StringSegment; 1]>::new();
+                let mut encoding = SmallVec::<[StringSegment; 1]>::new();
                 let mut remaining = width;
                 for segment in segment_widths(width) {
-                    let data_bytes = remaining.min(255);
-                    let padding_bytes = data_bytes.next_multiple_of(8) - data_bytes;
-                    segments.push(StringSegment {
-                        data_bytes,
-                        padding_bytes,
-                    });
-                    remaining -= data_bytes;
+                    let data_bytes = remaining.min(segment).min(255);
+                    let padding_bytes = segment - data_bytes;
+                    if data_bytes > 0 {
+                        encoding.push(StringSegment {
+                            data_bytes,
+                            padding_bytes,
+                        });
+                        remaining -= data_bytes;
+                    } else {
+                        encoding.last_mut().unwrap().padding_bytes += padding_bytes;
+                    }
                 }
+                CaseVar::String { width, encoding }
             }
         }
     }
+
+    fn bytes(&self) -> usize {
+        match self {
+            CaseVar::Numeric => 8,
+            CaseVar::String { width, encoding } => encoding
+                .iter()
+                .map(|segment| segment.data_bytes + segment.padding_bytes)
+                .sum(),
+        }
+    }
 }
 
 pub struct Cases {
     reader: Box<dyn ReadSeek>,
-    vars: Vec<CaseVar>,
+    case_vars: Vec<CaseVar>,
     compression: Option<Compression>,
     bias: f64,
     endian: Endian,
@@ -1120,13 +1177,20 @@ impl Cases {
     where
         R: Read + Seek + 'static,
     {
+        let case_vars = var_types
+            .types
+            .iter()
+            .flatten()
+            .copied()
+            .map(CaseVar::new)
+            .collect::<Vec<_>>();
         Self {
             reader: if header.compression == Some(Compression::ZLib) {
                 Box::new(ZlibDecodeMultiple::new(reader))
             } else {
                 Box::new(reader)
             },
-            var_types,
+            case_vars,
             compression: header.compression,
             bias: header.bias,
             endian: header.endian,
@@ -1147,14 +1211,14 @@ impl Iterator for Cases {
         let retval = if self.compression.is_some() {
             Datum::read_compressed_case(
                 &mut self.reader,
-                &self.var_types,
+                &self.case_vars,
                 &mut self.codes,
                 self.endian,
                 self.bias,
             )
             .transpose()
         } else {
-            Datum::read_case(&mut self.reader, &self.var_types, self.endian).transpose()
+            Datum::read_case(&mut self.reader, &self.case_vars, self.endian).transpose()
         };
         self.eof = matches!(retval, None | Some(Err(_)));
         retval
@@ -2037,10 +2101,10 @@ impl ValueLabelRecord<RawStrArray<8>, RawString> {
         let Some(&first_index) = dict_indexes.first() else {
             return Ok(None);
         };
-        let var_type = var_types.types[first_index as usize - 1].unwrap();
+        let var_type = VarType::from(var_types.types[first_index as usize - 1].unwrap());
         let mut wrong_type_indexes = Vec::new();
         dict_indexes.retain(|&index| {
-            if var_types.types[index as usize - 1] != Some(var_type) {
+            if var_types.types[index as usize - 1].map(VarType::from) != Some(var_type) {
                 wrong_type_indexes.push(index);
                 false
             } else {
@@ -3308,16 +3372,37 @@ impl ZTrailer {
     }
 }
 
-fn try_read_bytes<const N: usize, R: Read>(r: &mut R) -> Result<Option<[u8; N]>, IoError> {
-    let mut buf = [0; N];
-    let n = r.read(&mut buf)?;
+fn skip_bytes<R: Read>(r: &mut R, mut n: usize) -> Result<(), IoError> {
+    thread_local! {
+        static BUF: RefCell<[u8; 256]> = RefCell::new([0u8; 256]);
+    }
+    BUF.with_borrow_mut(|buf| {
+        while n > 0 {
+            let chunk = n.min(buf.len());
+            r.read_exact(&mut buf[..n])?;
+            n -= chunk;
+        }
+        Ok(())
+    })
+}
+
+fn try_read_bytes_into<R: Read>(r: &mut R, buf: &mut [u8]) -> Result<bool, IoError> {
+    let n = r.read(buf)?;
     if n > 0 {
-        if n < N {
+        if n < buf.len() {
             r.read_exact(&mut buf[n..])?;
         }
-        Ok(Some(buf))
+        Ok(true)
     } else {
-        Ok(None)
+        Ok(false)
+    }
+}
+
+fn try_read_bytes<const N: usize, R: Read>(r: &mut R) -> Result<Option<[u8; N]>, IoError> {
+    let mut buf = [0; N];
+    match try_read_bytes_into(r, &mut buf)? {
+        true => Ok(Some(buf)),
+        false => Ok(None),
     }
 }
 
@@ -3428,7 +3513,7 @@ impl LongStringValueLabelRecord<RawString, RawString> {
 
 #[derive(Default)]
 pub struct VarTypes {
-    pub types: Vec<Option<VarType>>,
+    pub types: Vec<Option<VarWidth>>,
 }
 
 impl VarTypes {
@@ -3437,8 +3522,8 @@ impl VarTypes {
     }
 
     pub fn push(&mut self, width: RawWidth) {
-        if let Ok(var_type) = VarType::try_from(width) {
-            self.types.push(Some(var_type));
+        if let Ok(var_width) = VarWidth::try_from(width) {
+            self.types.push(Some(var_width));
             for _ in 1..width.n_values().unwrap() {
                 self.types.push(None);
             }
@@ -3455,18 +3540,12 @@ impl VarTypes {
 
     pub fn var_type_at(&self, index: usize) -> Option<VarType> {
         if index >= 1 && index <= self.types.len() {
-            self.types[index - 1]
+            self.types[index - 1].map(VarType::from)
         } else {
             None
         }
     }
 
-    pub fn iter(&self) -> impl Iterator<Item = VarType> + use<'_> {
-        self.types
-            .iter()
-            .map(|var_type| var_type.unwrap_or(VarType::String))
-    }
-
     pub fn n_vars(&self) -> usize {
         self.types.iter().flatten().count()
     }