diff options
Diffstat (limited to 'src/cmd/join.rs')
-rw-r--r-- | src/cmd/join.rs | 248 |
1 files changed, 124 insertions, 124 deletions
diff --git a/src/cmd/join.rs b/src/cmd/join.rs index 79be11e..5f501c0 100644 --- a/src/cmd/join.rs +++ b/src/cmd/join.rs @@ -6,11 +6,11 @@ use std::iter::repeat; use std::str; use byteorder::{WriteBytesExt, BigEndian}; -use csv::{self, ByteString}; -use csv::index::Indexed; +use csv; use CliResult; use config::{Config, Delimiter}; +use index::Indexed; use select::{SelectColumns, Selection}; use util; @@ -71,6 +71,8 @@ Common options: Must be a single character. (default: ,) "; +type ByteString = Vec<u8>; + #[derive(RustcDecodable)] struct Args { arg_columns1: SelectColumns, @@ -89,8 +91,8 @@ struct Args { } pub fn run(argv: &[&str]) -> CliResult<()> { - let args: Args = try!(util::get_args(USAGE, argv)); - let mut state = try!(args.new_io_state()); + let args: Args = util::get_args(USAGE, argv)?; + let mut state = args.new_io_state()?; match ( args.flag_left, args.flag_right, @@ -98,23 +100,23 @@ pub fn run(argv: &[&str]) -> CliResult<()> { args.flag_cross, ) { (true, false, false, false) => { - try!(state.write_headers()); + state.write_headers()?; state.outer_join(false) } (false, true, false, false) => { - try!(state.write_headers()); + state.write_headers()?; state.outer_join(true) } (false, false, true, false) => { - try!(state.write_headers()); + state.write_headers()?; state.full_outer_join() } (false, false, false, true) => { - try!(state.write_headers()); + state.write_headers()?; state.cross_join() } (false, false, false, false) => { - try!(state.write_headers()); + state.write_headers()?; state.inner_join() } _ => fail!("Please pick exactly one join operation.") @@ -135,29 +137,29 @@ struct IoState<R, W: io::Write> { impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> { fn write_headers(&mut self) -> CliResult<()> { if !self.no_headers { - let mut headers = try!(self.rdr1.byte_headers()); - headers.extend(try!(self.rdr2.byte_headers()).into_iter()); - try!(self.wtr.write(headers.into_iter())); + let mut headers = self.rdr1.byte_headers()?.clone(); + headers.extend(self.rdr2.byte_headers()?.iter()); + self.wtr.write_record(&headers)?; } Ok(()) } fn inner_join(mut self) -> CliResult<()> { - let mut validx = try!(ValueIndex::new(self.rdr2, &self.sel2, - self.casei, self.nulls)); + let mut scratch = csv::ByteRecord::new(); + let mut validx = ValueIndex::new( + self.rdr2, &self.sel2, self.casei, self.nulls)?; for row in self.rdr1.byte_records() { - let row = try!(row); + let row = row?; let key = get_row_key(&self.sel1, &row, self.casei); match validx.values.get(&key) { None => continue, Some(rows) => { for &rowi in rows.iter() { - try!(validx.idx.seek(rowi as u64)); + validx.idx.seek(rowi as u64)?; - let mut row1 = row.iter().map(|f| Ok(&**f)); - let row2 = unsafe { validx.idx.byte_fields() }; - let combined = row1.by_ref().chain(row2); - try!(self.wtr.write_iter(combined)); + validx.idx.read_byte_record(&mut scratch)?; + let combined = row.iter().chain(scratch.iter()); + self.wtr.write_record(combined)?; } } } @@ -171,33 +173,30 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> { ::std::mem::swap(&mut self.sel1, &mut self.sel2); } - let (_, pad2) = try!(self.get_padding()); - let mut validx = try!(ValueIndex::new(self.rdr2, &self.sel2, - self.casei, self.nulls)); + let mut scratch = csv::ByteRecord::new(); + let (_, pad2) = self.get_padding()?; + let mut validx = ValueIndex::new( + self.rdr2, &self.sel2, self.casei, self.nulls)?; for row in self.rdr1.byte_records() { - let row = try!(row); - let key = get_row_key(&self.sel1, &*row, self.casei); + let row = row?; + let key = get_row_key(&self.sel1, &row, self.casei); match validx.values.get(&key) { None => { - let row1 = row.iter().map(|f| Ok(&**f)); - let row2 = pad2.iter().map(|f| Ok(&**f)); if right { - try!(self.wtr.write_iter(row2.chain(row1))); + self.wtr.write_record(pad2.iter().chain(&row))?; } else { - try!(self.wtr.write_iter(row1.chain(row2))); + self.wtr.write_record(row.iter().chain(&pad2))?; } } Some(rows) => { for &rowi in rows.iter() { - try!(validx.idx.seek(rowi as u64)); - let row1 = row.iter().map(|f| Ok(&**f)); - let row2 = unsafe { - validx.idx.byte_fields() - }; + validx.idx.seek(rowi as u64)?; + let row1 = row.iter(); + validx.idx.read_byte_record(&mut scratch)?; if right { - try!(self.wtr.write_iter(row2.chain(row1))); + self.wtr.write_record(scratch.iter().chain(row1))?; } else { - try!(self.wtr.write_iter(row1.chain(row2))); + self.wtr.write_record(row1.chain(&scratch))?; } } } @@ -207,32 +206,28 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> { } fn full_outer_join(mut self) -> CliResult<()> { - let (pad1, pad2) = try!(self.get_padding()); - let mut validx = try!(ValueIndex::new(self.rdr2, &self.sel2, - self.casei, self.nulls)); + let mut scratch = csv::ByteRecord::new(); + let (pad1, pad2) = self.get_padding()?; + let mut validx = ValueIndex::new( + self.rdr2, &self.sel2, self.casei, self.nulls)?; // Keep track of which rows we've written from rdr2. let mut rdr2_written: Vec<_> = repeat(false).take(validx.num_rows).collect(); for row1 in self.rdr1.byte_records() { - let row1 = try!(row1); - let key = get_row_key(&self.sel1, &*row1, self.casei); + let row1 = row1?; + let key = get_row_key(&self.sel1, &row1, self.casei); match validx.values.get(&key) { None => { - let row1 = row1.iter().map(|f| Ok(&**f)); - let row2 = pad2.iter().map(|f| Ok(&**f)); - try!(self.wtr.write_iter(row1.chain(row2))); + self.wtr.write_record(row1.iter().chain(&pad2))?; } Some(rows) => { for &rowi in rows.iter() { rdr2_written[rowi] = true; - try!(validx.idx.seek(rowi as u64)); - let row1 = row1.iter().map(|f| Ok(&**f)); - let row2 = unsafe { - validx.idx.byte_fields() - }; - try!(self.wtr.write_iter(row1.chain(row2))); + validx.idx.seek(rowi as u64)?; + validx.idx.read_byte_record(&mut scratch)?; + self.wtr.write_record(row1.iter().chain(&scratch))?; } } } @@ -242,46 +237,41 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> { // from rdr1. for (i, &written) in rdr2_written.iter().enumerate() { if !written { - try!(validx.idx.seek(i as u64)); - let row1 = pad1.iter().map(|f| Ok(&**f)); - let row2 = unsafe { - validx.idx.byte_fields() - }; - try!(self.wtr.write_iter(row1.chain(row2))); + validx.idx.seek(i as u64)?; + validx.idx.read_byte_record(&mut scratch)?; + self.wtr.write_record(pad1.iter().chain(&scratch))?; } } Ok(()) } fn cross_join(mut self) -> CliResult<()> { + let mut pos = csv::Position::new(); + pos.set_byte(0); + let mut row2 = csv::ByteRecord::new(); for row1 in self.rdr1.byte_records() { - let row1 = try!(row1); - - try!(self.rdr2.seek(0)); - let mut first = true; - while !self.rdr2.done() { - // Skip the header row. The raw byte interface won't - // do it for us. - if first && !self.no_headers { - while let Some(f) = - self.rdr2.next_bytes().into_iter_result() { try!(f); } - first = false; - } - let row1 = row1.iter().map(|f| Ok(&**f)); - let row2 = unsafe { self.rdr2.byte_fields() }; - try!(self.wtr.write_iter(row1.chain(row2))); + let row1 = row1?; + self.rdr2.seek(pos.clone())?; + if self.rdr2.has_headers() { + // Read and skip the header row, since CSV readers disable + // the header skipping logic after being seeked. + self.rdr2.read_byte_record(&mut row2)?; + } + while self.rdr2.read_byte_record(&mut row2)? { + self.wtr.write_record(row1.iter().chain(&row2))?; } } Ok(()) } - fn get_padding(&mut self) - -> CliResult<(Vec<ByteString>, Vec<ByteString>)> { - let len1 = try!(self.rdr1.byte_headers()).len(); - let len2 = try!(self.rdr2.byte_headers()).len(); + fn get_padding( + &mut self, + ) -> CliResult<(csv::ByteRecord, csv::ByteRecord)> { + let len1 = self.rdr1.byte_headers()?.len(); + let len2 = self.rdr2.byte_headers()?.len(); Ok(( - repeat(util::empty_field()).take(len1).collect(), - repeat(util::empty_field()).take(len2).collect(), + repeat(b"").take(len1).collect(), + repeat(b"").take(len2).collect(), )) } } @@ -290,20 +280,20 @@ impl Args { fn new_io_state(&self) -> CliResult<IoState<fs::File, Box<io::Write+'static>>> { let rconf1 = Config::new(&Some(self.arg_input1.clone())) - .delimiter(self.flag_delimiter) - .no_headers(self.flag_no_headers) - .select(self.arg_columns1.clone()); + .delimiter(self.flag_delimiter) + .no_headers(self.flag_no_headers) + .select(self.arg_columns1.clone()); let rconf2 = Config::new(&Some(self.arg_input2.clone())) - .delimiter(self.flag_delimiter) - .no_headers(self.flag_no_headers) - .select(self.arg_columns2.clone()); - - let mut rdr1 = try!(rconf1.reader_file()); - let mut rdr2 = try!(rconf2.reader_file()); - let (sel1, sel2) = try!(self.get_selections(&rconf1, &mut rdr1, - &rconf2, &mut rdr2)); + .delimiter(self.flag_delimiter) + .no_headers(self.flag_no_headers) + .select(self.arg_columns2.clone()); + + let mut rdr1 = rconf1.reader_file()?; + let mut rdr2 = rconf2.reader_file()?; + let (sel1, sel2) = self.get_selections( + &rconf1, &mut rdr1, &rconf2, &mut rdr2)?; Ok(IoState { - wtr: try!(Config::new(&self.flag_output).writer()), + wtr: Config::new(&self.flag_output).writer()?, rdr1: rdr1, sel1: sel1, rdr2: rdr2, @@ -314,15 +304,15 @@ impl Args { }) } - fn get_selections<R: io::Read> - (&self, - rconf1: &Config, rdr1: &mut csv::Reader<R>, - rconf2: &Config, rdr2: &mut csv::Reader<R>) - -> CliResult<(Selection, Selection)> { - let headers1 = try!(rdr1.byte_headers()); - let headers2 = try!(rdr2.byte_headers()); - let select1 = try!(rconf1.selection(&*headers1)); - let select2 = try!(rconf2.selection(&*headers2)); + fn get_selections<R: io::Read>( + &self, + rconf1: &Config, rdr1: &mut csv::Reader<R>, + rconf2: &Config, rdr2: &mut csv::Reader<R>, + ) -> CliResult<(Selection, Selection)> { + let headers1 = rdr1.byte_headers()?; + let headers2 = rdr2.byte_headers()?; + let select1 = rconf1.selection(&*headers1)?; + let select2 = rconf2.selection(&*headers2)?; if select1.len() != select2.len() { return fail!(format!( "Column selections must have the same number of columns, \ @@ -341,40 +331,44 @@ struct ValueIndex<R> { } impl<R: io::Read + io::Seek> ValueIndex<R> { - fn new(mut rdr: csv::Reader<R>, sel: &Selection, - casei: bool, nulls: bool) - -> CliResult<ValueIndex<R>> { + fn new( + mut rdr: csv::Reader<R>, + sel: &Selection, + casei: bool, + nulls: bool, + ) -> CliResult<ValueIndex<R>> { let mut val_idx = HashMap::with_capacity(10000); let mut row_idx = io::Cursor::new(Vec::with_capacity(8 * 10000)); let (mut rowi, mut count) = (0usize, 0usize); - let row_len = try!(rdr.byte_headers()).len(); // This logic is kind of tricky. Basically, we want to include // the header row in the line index (because that's what csv::index // does), but we don't want to include header values in the ValueIndex. - if !rdr.has_headers { + if !rdr.has_headers() { // ... so if there are no headers, we seek to the beginning and // index everything. - try!(rdr.seek(0)); + let mut pos = csv::Position::new(); + pos.set_byte(0); + rdr.seek(pos)?; } else { // ... and if there are headers, we make sure that we've parsed // them, and write the offset of the header row to the index. - try!(rdr.byte_headers()); - try!(row_idx.write_u64::<BigEndian>(0)); + rdr.byte_headers()?; + row_idx.write_u64::<BigEndian>(0)?; count += 1; } - while !rdr.done() { - // This is a bit hokey. We're doing this manually instead of - // calling `csv::index::create` so we can create both indexes - // in one pass. - try!(row_idx.write_u64::<BigEndian>(rdr.byte_offset())); - - let mut row = Vec::with_capacity(row_len); - while let Some(r) = rdr.next_bytes().into_iter_result() { - row.push(try!(r).to_vec()); - } - let fields: Vec<_> = sel.select(&row).map(|v| transform(v, casei)).collect(); + let mut row = csv::ByteRecord::new(); + while rdr.read_byte_record(&mut row)? { + // This is a bit hokey. We're doing this manually instead of using + // the `csv-index` crate directly so that we can create both + // indexes in one pass. + row_idx.write_u64::<BigEndian>(row.position().unwrap().byte())?; + + let fields: Vec<_> = sel + .select(&row) + .map(|v| transform(v, casei)) + .collect(); if nulls || !fields.iter().any(|f| f.is_empty()) { match val_idx.entry(fields) { Entry::Vacant(v) => { @@ -382,17 +376,20 @@ impl<R: io::Read + io::Seek> ValueIndex<R> { rows.push(rowi); v.insert(rows); } - Entry::Occupied(mut v) => { v.get_mut().push(rowi); } + Entry::Occupied(mut v) => { + v.get_mut().push(rowi); + } } } rowi += 1; count += 1; } - try!(row_idx.write_u64::<BigEndian>(count as u64)); + + row_idx.write_u64::<BigEndian>(count as u64)?; + let idx = Indexed::open(rdr, io::Cursor::new(row_idx.into_inner()))?; Ok(ValueIndex { values: val_idx, - idx: try!(Indexed::open(rdr, - io::Cursor::new(row_idx.into_inner()))), + idx: idx, num_rows: rowi, }) } @@ -408,14 +405,17 @@ impl<R> fmt::Debug for ValueIndex<R> { let keys = keys.iter() .map(|k| String::from_utf8(k.to_vec()).unwrap()) .collect::<Vec<_>>(); - try!(writeln!(f, "({}) => {:?}", keys.connect(", "), rows)) + writeln!(f, "({}) => {:?}", keys.join(", "), rows)? } Ok(()) } } -fn get_row_key(sel: &Selection, row: &[ByteString], casei: bool) - -> Vec<ByteString> { +fn get_row_key( + sel: &Selection, + row: &csv::ByteRecord, + casei: bool, +) -> Vec<ByteString> { sel.select(row).map(|v| transform(&v, casei)).collect() } |