From: Ben Pfaff Date: Sat, 5 Jul 2025 22:40:22 +0000 (-0700) Subject: work on fully reading data X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4d484983261d3522918626981aca747ac81c3e09;p=pspp work on fully reading data --- diff --git a/rust/pspp/src/sys/raw.rs b/rust/pspp/src/sys/raw.rs index d2681c6e2b..86f2971af1 100644 --- a/rust/pspp/src/sys/raw.rs +++ b/rust/pspp/src/sys/raw.rs @@ -766,105 +766,145 @@ impl RawDatum { VarType::Numeric => Datum::Number(endian.parse(raw.0)), } } +} +impl Datum { fn read_case( reader: &mut R, - var_types: &VarTypes, + case_vars: &[CaseVar], endian: Endian, ) -> Result>, Error> { + fn eof( + reader: &mut R, + case_vars: &[CaseVar], + case_start: u64, + ) -> Result>, Error> { + let offset = reader.stream_position()?; + if offset == case_start { + Ok(None) + } else { + Err(Error::EofInCase { + offset, + case_ofs: offset - case_start, + case_len: case_vars.iter().map(CaseVar::bytes).sum(), + }) + } + } + let case_start = reader.stream_position()?; - let mut values = Vec::with_capacity(var_types.n_values()); - for (i, var_type) in var_types.iter().enumerate() { - let Some(raw) = try_read_bytes(reader)? else { - if i == 0 { - return Ok(None); - } else { - let offset = reader.stream_position()?; - return Err(Error::EofInCase { - offset, - case_ofs: offset - case_start, - case_len: var_types.n_values() * 8, - }); + let mut values = Vec::with_capacity(case_vars.len()); + for var in case_vars { + match var { + CaseVar::Numeric => { + let Some(raw) = try_read_bytes(reader)? else { + return eof(reader, case_vars, case_start); + }; + values.push(Datum::Number(endian.parse(raw))); } - }; - values.push(Datum::from_raw(&UntypedDatum(raw), var_type, endian)); + CaseVar::String { width, encoding } => { + let mut datum = vec![0; *width]; + let mut offset = 0; + for segment in encoding { + if !try_read_bytes_into( + reader, + &mut datum[offset..offset + segment.data_bytes], + )? { + return eof(reader, case_vars, case_start); + } + skip_bytes(reader, segment.padding_bytes)?; + offset += segment.data_bytes; + } + values.push(Datum::String(RawString(datum))); + } + } } Ok(Some(values)) } + fn read_compressed_chunk( + reader: &mut R, + codes: &mut VecDeque, + endian: Endian, + bias: f64, + ) -> Result, Error> { + loop { + match codes.pop_front() { + Some(0) => (), + Some(252) => return Ok(None), + Some(253) => return Ok(Some(read_bytes(reader)?)), + Some(254) => return Ok(Some([b' '; 8])), + Some(255) => return Ok(Some(endian.to_bytes(-f64::MAX))), + Some(code) => return Ok(Some(endian.to_bytes(code as f64 - bias))), + None => { + match try_read_bytes::<8, _>(reader)? { + Some(new_codes) => codes.extend(new_codes.into_iter()), + None => return Ok(None), + }; + } + }; + } + } fn read_compressed_case( reader: &mut R, - var_types: &VarTypes, + case_vars: &[CaseVar], codes: &mut VecDeque, endian: Endian, bias: f64, ) -> Result>, Error> { + fn eof( + reader: &mut R, + case_vars: &[CaseVar], + case_start: u64, + ) -> Result>, Error> { + let offset = reader.stream_position()?; + if offset == case_start { + Ok(None) + } else { + Err(Error::EofInCase { + offset, + case_ofs: offset - case_start, + case_len: case_vars.iter().map(CaseVar::bytes).sum(), + }) + } + } + let case_start = reader.stream_position()?; - let mut values = Vec::with_capacity(var_types.n_values()); - for (i, var_type) in var_types.iter().enumerate() { - let value = loop { - let Some(code) = codes.pop_front() else { - let Some(new_codes): Option<[u8; 8]> = try_read_bytes(reader)? else { - if i == 0 { - return Ok(None); - } else { - let offset = reader.stream_position()?; - return Err(Error::EofInCompressedCase { - offset, - case_ofs: offset - case_start, - }); - } + let mut values = Vec::with_capacity(case_vars.len()); + for var in case_vars { + match var { + CaseVar::Numeric => { + let Some(raw) = Self::read_compressed_chunk(reader, codes, endian, bias)? + else { + return eof(reader, case_vars, case_start); }; - codes.extend(new_codes.into_iter()); - continue; - }; - match code { - 0 => (), - 1..=251 => match var_type { - VarType::Numeric => break Self::Number(Some(code as f64 - bias)), - VarType::String => { - break Self::String(RawStrArray(endian.to_bytes(code as f64 - bias))) - } - }, - 252 => { - if i == 0 { - return Ok(None); - } else { - let offset = reader.stream_position()?; - return Err(Error::PartialCompressedCase { - offset, - case_ofs: offset - case_start, - }); + values.push(Datum::Number(endian.parse(raw))); + } + CaseVar::String { width, encoding } => { + let mut datum = vec![0; *width]; + for segment in encoding { + let mut data_bytes = segment.data_bytes; + let mut padding_bytes = segment.padding_bytes; + while data_bytes > 0 || padding_bytes > 0 { + let Some(raw) = + Self::read_compressed_chunk(reader, codes, endian, bias)? + else { + return eof(reader, case_vars, case_start); + }; + let n_data = data_bytes.min(8); + datum.extend_from_slice(&raw[..n_data]); + data_bytes -= n_data; + padding_bytes -= 8 - n_data; } } - 253 => { - break Self::from_raw(&UntypedDatum(read_bytes(reader)?), var_type, endian) - } - 254 => match var_type { - VarType::String => break Self::String(RawStrArray(*b" ")), // XXX EBCDIC - VarType::Numeric => { - return Err(Error::CompressedStringExpected { - offset: case_start, - case_ofs: reader.stream_position()? - case_start, - }) - } - }, - 255 => match var_type { - VarType::Numeric => break Self::Number(None), - VarType::String => { - return Err(Error::CompressedNumberExpected { - offset: case_start, - case_ofs: reader.stream_position()? - case_start, - }) - } - }, + values.push(Datum::String(RawString(datum))); } - }; - values.push(value); + } } Ok(Some(values)) } +} +impl RawDatum { pub fn decode(&self, width: VarWidth) -> Datum { match self { Self::Number(x) => Datum::Number(*x), @@ -1062,11 +1102,13 @@ struct StringSegment { fn segment_widths(width: usize) -> impl Iterator { let n_segments = width.div_ceil(252); - repeat_n(255, n_segments - 1).chain(if n_segments > 1 { - std::iter::once(width - (n_segments - 1) * 252) - } else { - std::iter::once(width) - }) + repeat_n(255, n_segments - 1) + .chain(if n_segments > 1 { + std::iter::once(width - (n_segments - 1) * 252) + } else { + std::iter::once(width) + }) + .map(|w| w.next_multiple_of(8)) } enum CaseVar { @@ -1083,25 +1125,40 @@ impl CaseVar { VarWidth::Numeric => Self::Numeric, VarWidth::String(width) => { let width = width as usize; - let mut segments = SmallVec::<[StringSegment; 1]>::new(); + let mut encoding = SmallVec::<[StringSegment; 1]>::new(); let mut remaining = width; for segment in segment_widths(width) { - let data_bytes = remaining.min(255); - let padding_bytes = data_bytes.next_multiple_of(8) - data_bytes; - segments.push(StringSegment { - data_bytes, - padding_bytes, - }); - remaining -= data_bytes; + let data_bytes = remaining.min(segment).min(255); + let padding_bytes = segment - data_bytes; + if data_bytes > 0 { + encoding.push(StringSegment { + data_bytes, + padding_bytes, + }); + remaining -= data_bytes; + } else { + encoding.last_mut().unwrap().padding_bytes += padding_bytes; + } } + CaseVar::String { width, encoding } } } } + + fn bytes(&self) -> usize { + match self { + CaseVar::Numeric => 8, + CaseVar::String { width, encoding } => encoding + .iter() + .map(|segment| segment.data_bytes + segment.padding_bytes) + .sum(), + } + } } pub struct Cases { reader: Box, - vars: Vec, + case_vars: Vec, compression: Option, bias: f64, endian: Endian, @@ -1120,13 +1177,20 @@ impl Cases { where R: Read + Seek + 'static, { + let case_vars = var_types + .types + .iter() + .flatten() + .copied() + .map(CaseVar::new) + .collect::>(); Self { reader: if header.compression == Some(Compression::ZLib) { Box::new(ZlibDecodeMultiple::new(reader)) } else { Box::new(reader) }, - var_types, + case_vars, compression: header.compression, bias: header.bias, endian: header.endian, @@ -1147,14 +1211,14 @@ impl Iterator for Cases { let retval = if self.compression.is_some() { Datum::read_compressed_case( &mut self.reader, - &self.var_types, + &self.case_vars, &mut self.codes, self.endian, self.bias, ) .transpose() } else { - Datum::read_case(&mut self.reader, &self.var_types, self.endian).transpose() + Datum::read_case(&mut self.reader, &self.case_vars, self.endian).transpose() }; self.eof = matches!(retval, None | Some(Err(_))); retval @@ -2037,10 +2101,10 @@ impl ValueLabelRecord, RawString> { let Some(&first_index) = dict_indexes.first() else { return Ok(None); }; - let var_type = var_types.types[first_index as usize - 1].unwrap(); + let var_type = VarType::from(var_types.types[first_index as usize - 1].unwrap()); let mut wrong_type_indexes = Vec::new(); dict_indexes.retain(|&index| { - if var_types.types[index as usize - 1] != Some(var_type) { + if var_types.types[index as usize - 1].map(VarType::from) != Some(var_type) { wrong_type_indexes.push(index); false } else { @@ -3308,16 +3372,37 @@ impl ZTrailer { } } -fn try_read_bytes(r: &mut R) -> Result, IoError> { - let mut buf = [0; N]; - let n = r.read(&mut buf)?; +fn skip_bytes(r: &mut R, mut n: usize) -> Result<(), IoError> { + thread_local! { + static BUF: RefCell<[u8; 256]> = RefCell::new([0u8; 256]); + } + BUF.with_borrow_mut(|buf| { + while n > 0 { + let chunk = n.min(buf.len()); + r.read_exact(&mut buf[..n])?; + n -= chunk; + } + Ok(()) + }) +} + +fn try_read_bytes_into(r: &mut R, buf: &mut [u8]) -> Result { + let n = r.read(buf)?; if n > 0 { - if n < N { + if n < buf.len() { r.read_exact(&mut buf[n..])?; } - Ok(Some(buf)) + Ok(true) } else { - Ok(None) + Ok(false) + } +} + +fn try_read_bytes(r: &mut R) -> Result, IoError> { + let mut buf = [0; N]; + match try_read_bytes_into(r, &mut buf)? { + true => Ok(Some(buf)), + false => Ok(None), } } @@ -3428,7 +3513,7 @@ impl LongStringValueLabelRecord { #[derive(Default)] pub struct VarTypes { - pub types: Vec>, + pub types: Vec>, } impl VarTypes { @@ -3437,8 +3522,8 @@ impl VarTypes { } pub fn push(&mut self, width: RawWidth) { - if let Ok(var_type) = VarType::try_from(width) { - self.types.push(Some(var_type)); + if let Ok(var_width) = VarWidth::try_from(width) { + self.types.push(Some(var_width)); for _ in 1..width.n_values().unwrap() { self.types.push(None); } @@ -3455,18 +3540,12 @@ impl VarTypes { pub fn var_type_at(&self, index: usize) -> Option { if index >= 1 && index <= self.types.len() { - self.types[index - 1] + self.types[index - 1].map(VarType::from) } else { None } } - pub fn iter(&self) -> impl Iterator + use<'_> { - self.types - .iter() - .map(|var_type| var_type.unwrap_or(VarType::String)) - } - pub fn n_vars(&self) -> usize { self.types.iter().flatten().count() }