use binrw::{BinWrite, Endian, Error as BinError};
use chrono::Local;
use encoding_rs::Encoding;
-use flate2::{Compress, FlushCompress, Status};
+use flate2::write::ZlibEncoder;
use itertools::zip_eq;
use smallvec::SmallVec;
use crate::{
- data::{Case, Datum},
+ data::Datum,
dictionary::{
Alignment, Attributes, CategoryLabels, Dictionary, Measure, MultipleResponseType,
ValueLabels, VarWidth,
impl<T> WriteSeek for T where T: Write + Seek {}
impl WriterInner {
+ fn finish(mut self) -> Result<(), BinError> {
+ self.flush_compressed()
+ }
+
fn flush_compressed(&mut self) -> Result<(), BinError> {
if !self.opcodes.is_empty() {
self.opcodes.resize(8, 0);
},
})
}
-}
-impl Writer {
+ /// Finishes writing the file, flushing buffers and updating headers to
+ /// match the final case counts.
+ pub fn finish(mut self) -> Result<(), BinError> {
+ self.try_finish()
+ }
+
+ /// Tries to finish writing the file, flushing buffers and updating headers
+ /// to match the final case counts.
+ ///
+ /// # Panic
+ ///
+ /// Attempts to write more cases after calling this function may result in a
+ /// panic.
+ pub fn try_finish(&mut self) -> Result<(), BinError> {
+ self.inner.flush_compressed()
+ }
+
/// Writes `case` to the system file.
- pub fn write_case(&mut self, case: &Case) -> Result<(), BinError> {
+ pub fn write_case<'a>(
+ &mut self,
+ case: impl IntoIterator<Item = &'a Datum>,
+ ) -> Result<(), BinError> {
match self.compression {
- Some(_) => self.write_case_compressed(case),
- None => self.write_case_uncompressed(case),
+ Some(_) => self.write_case_compressed(case.into_iter()),
+ None => self.write_case_uncompressed(case.into_iter()),
}
}
- fn write_case_uncompressed(&mut self, case: &Case) -> Result<(), BinError> {
- for (var, datum) in zip_eq(&self.case_vars, &case.0) {
+ fn write_case_uncompressed<'a>(
+ &mut self,
+ case: impl Iterator<Item = &'a Datum>,
+ ) -> Result<(), BinError> {
+ for (var, datum) in zip_eq(&self.case_vars, case) {
match var {
CaseVar::Numeric => datum
.as_number()
}
Ok(())
}
- fn write_case_compressed(&mut self, case: &Case) -> Result<(), BinError> {
- for (var, datum) in zip_eq(&self.case_vars, &case.0) {
+ fn write_case_compressed<'a>(
+ &mut self,
+ case: impl Iterator<Item = &'a Datum>,
+ ) -> Result<(), BinError> {
+ for (var, datum) in zip_eq(&self.case_vars, case) {
match var {
CaseVar::Numeric => match datum.as_number().unwrap() {
None => self.inner.put_opcode(255)?,
}
}
+impl Drop for Writer {
+ fn drop(&mut self) {
+ let _ = self.try_finish();
+ }
+}
+
struct Block {
uncompressed_size: u64,
compressed_size: u64,
{
header: RawZHeader,
trailer: RawZTrailer,
- compress: Compress,
- buf: Vec<u8>,
+ encoder: ZlibEncoder<Vec<u8>>,
inner: W,
}
block_size: ZBLOCK_SIZE as u32,
blocks: Vec::new(),
},
- compress: Compress::new(flate2::Compression::new(5), false),
- buf: Vec::with_capacity(4096),
+ encoder: ZlibEncoder::new(Vec::new(), flate2::Compression::new(1)),
inner,
})
}
+
+ fn flush_block(&mut self) -> std::io::Result<()> {
+ let total_in = self.encoder.total_in();
+ if total_in > 0 {
+ let buf = self.encoder.reset(Vec::new())?;
+ let total_out = buf.len();
+ self.inner.write_all(&buf)?;
+ self.encoder.reset(buf).unwrap();
+
+ self.trailer.blocks.push(ZBlock {
+ uncompressed_size: total_in as u32,
+ compressed_size: total_out as u32,
+ uncompressed_ofs: match self.trailer.blocks.last() {
+ Some(prev) => prev.uncompressed_ofs + prev.uncompressed_size as u64,
+ None => self.header.zheader_offset,
+ },
+ compressed_ofs: match self.trailer.blocks.last() {
+ Some(prev) => prev.compressed_ofs + prev.compressed_size as u64,
+ None => self.header.zheader_offset + 24,
+ },
+ });
+ }
+ Ok(())
+ }
+
fn try_finish(&mut self) -> Result<(), BinError> {
- self.flush()?;
+ self.flush_block()?;
let ztrailer_offset = self.inner.stream_position()?;
self.trailer.write_le(&mut self.inner)?;
let header = RawZHeader {
ztrailer_offset,
ztrailer_len: self.trailer.len() as u64,
};
- dbg!(&header);
self.inner.seek(SeekFrom::Start(header.zheader_offset))?;
header.write_le(&mut self.inner)
}
+
fn finish(mut self) -> Result<(), BinError> {
self.try_finish()
}
W: Write + Seek,
{
fn drop(&mut self) {
- dbg!();
let _ = self.try_finish();
}
}
fn write(&mut self, mut buf: &[u8]) -> Result<usize, IoError> {
let n = buf.len();
while buf.len() > 0 {
- if self.compress.total_in() >= ZBLOCK_SIZE {
- self.flush()?;
+ if self.encoder.total_in() >= ZBLOCK_SIZE {
+ self.flush_block()?;
}
let chunk = buf
.len()
- .min((ZBLOCK_SIZE - self.compress.total_in()) as usize);
- let in_before = self.compress.total_in();
- self.buf.clear();
- self.compress
- .compress_vec(&buf[..chunk], &mut self.buf, FlushCompress::None)
- .unwrap();
- let consumed = self.compress.total_in() - in_before;
- self.inner.write_all(&self.buf)?;
- buf = &buf[consumed as usize..];
+ .min((ZBLOCK_SIZE - self.encoder.total_in()) as usize);
+ self.encoder.write_all(&buf[..chunk])?;
+ buf = &buf[chunk..];
}
Ok(n)
}
fn flush(&mut self) -> std::io::Result<()> {
- if self.compress.total_in() > 0 {
- let mut status = Status::Ok;
- while status == Status::Ok {
- self.buf.clear();
- status = self
- .compress
- .compress_vec(&[], &mut self.buf, FlushCompress::Finish)
- .unwrap();
- self.inner.write_all(&self.buf)?;
- }
- assert_eq!(status, Status::StreamEnd);
-
- self.trailer.blocks.push(ZBlock {
- uncompressed_size: self.compress.total_in() as u32,
- compressed_size: self.compress.total_out() as u32,
- uncompressed_ofs: match self.trailer.blocks.last() {
- Some(prev) => prev.uncompressed_ofs + prev.uncompressed_size as u64,
- None => self.header.zheader_offset,
- },
- compressed_ofs: match self.trailer.blocks.last() {
- Some(prev) => prev.compressed_ofs + prev.compressed_size as u64,
- None => self.header.zheader_offset + 24,
- },
- });
- self.compress.reset();
- }
Ok(())
}
}
W: Write + Seek,
{
fn seek(&mut self, _pos: std::io::SeekFrom) -> Result<u64, IoError> {
- panic!();
Err(IoError::from(ErrorKind::NotSeekable))
}
}