summaryrefslogtreecommitdiffstats
path: root/src/cmd/join.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/cmd/join.rs')
-rw-r--r--src/cmd/join.rs248
1 files changed, 124 insertions, 124 deletions
diff --git a/src/cmd/join.rs b/src/cmd/join.rs
index 79be11e..5f501c0 100644
--- a/src/cmd/join.rs
+++ b/src/cmd/join.rs
@@ -6,11 +6,11 @@ use std::iter::repeat;
use std::str;
use byteorder::{WriteBytesExt, BigEndian};
-use csv::{self, ByteString};
-use csv::index::Indexed;
+use csv;
use CliResult;
use config::{Config, Delimiter};
+use index::Indexed;
use select::{SelectColumns, Selection};
use util;
@@ -71,6 +71,8 @@ Common options:
Must be a single character. (default: ,)
";
+type ByteString = Vec<u8>;
+
#[derive(RustcDecodable)]
struct Args {
arg_columns1: SelectColumns,
@@ -89,8 +91,8 @@ struct Args {
}
pub fn run(argv: &[&str]) -> CliResult<()> {
- let args: Args = try!(util::get_args(USAGE, argv));
- let mut state = try!(args.new_io_state());
+ let args: Args = util::get_args(USAGE, argv)?;
+ let mut state = args.new_io_state()?;
match (
args.flag_left,
args.flag_right,
@@ -98,23 +100,23 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
args.flag_cross,
) {
(true, false, false, false) => {
- try!(state.write_headers());
+ state.write_headers()?;
state.outer_join(false)
}
(false, true, false, false) => {
- try!(state.write_headers());
+ state.write_headers()?;
state.outer_join(true)
}
(false, false, true, false) => {
- try!(state.write_headers());
+ state.write_headers()?;
state.full_outer_join()
}
(false, false, false, true) => {
- try!(state.write_headers());
+ state.write_headers()?;
state.cross_join()
}
(false, false, false, false) => {
- try!(state.write_headers());
+ state.write_headers()?;
state.inner_join()
}
_ => fail!("Please pick exactly one join operation.")
@@ -135,29 +137,29 @@ struct IoState<R, W: io::Write> {
impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
fn write_headers(&mut self) -> CliResult<()> {
if !self.no_headers {
- let mut headers = try!(self.rdr1.byte_headers());
- headers.extend(try!(self.rdr2.byte_headers()).into_iter());
- try!(self.wtr.write(headers.into_iter()));
+ let mut headers = self.rdr1.byte_headers()?.clone();
+ headers.extend(self.rdr2.byte_headers()?.iter());
+ self.wtr.write_record(&headers)?;
}
Ok(())
}
fn inner_join(mut self) -> CliResult<()> {
- let mut validx = try!(ValueIndex::new(self.rdr2, &self.sel2,
- self.casei, self.nulls));
+ let mut scratch = csv::ByteRecord::new();
+ let mut validx = ValueIndex::new(
+ self.rdr2, &self.sel2, self.casei, self.nulls)?;
for row in self.rdr1.byte_records() {
- let row = try!(row);
+ let row = row?;
let key = get_row_key(&self.sel1, &row, self.casei);
match validx.values.get(&key) {
None => continue,
Some(rows) => {
for &rowi in rows.iter() {
- try!(validx.idx.seek(rowi as u64));
+ validx.idx.seek(rowi as u64)?;
- let mut row1 = row.iter().map(|f| Ok(&**f));
- let row2 = unsafe { validx.idx.byte_fields() };
- let combined = row1.by_ref().chain(row2);
- try!(self.wtr.write_iter(combined));
+ validx.idx.read_byte_record(&mut scratch)?;
+ let combined = row.iter().chain(scratch.iter());
+ self.wtr.write_record(combined)?;
}
}
}
@@ -171,33 +173,30 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
::std::mem::swap(&mut self.sel1, &mut self.sel2);
}
- let (_, pad2) = try!(self.get_padding());
- let mut validx = try!(ValueIndex::new(self.rdr2, &self.sel2,
- self.casei, self.nulls));
+ let mut scratch = csv::ByteRecord::new();
+ let (_, pad2) = self.get_padding()?;
+ let mut validx = ValueIndex::new(
+ self.rdr2, &self.sel2, self.casei, self.nulls)?;
for row in self.rdr1.byte_records() {
- let row = try!(row);
- let key = get_row_key(&self.sel1, &*row, self.casei);
+ let row = row?;
+ let key = get_row_key(&self.sel1, &row, self.casei);
match validx.values.get(&key) {
None => {
- let row1 = row.iter().map(|f| Ok(&**f));
- let row2 = pad2.iter().map(|f| Ok(&**f));
if right {
- try!(self.wtr.write_iter(row2.chain(row1)));
+ self.wtr.write_record(pad2.iter().chain(&row))?;
} else {
- try!(self.wtr.write_iter(row1.chain(row2)));
+ self.wtr.write_record(row.iter().chain(&pad2))?;
}
}
Some(rows) => {
for &rowi in rows.iter() {
- try!(validx.idx.seek(rowi as u64));
- let row1 = row.iter().map(|f| Ok(&**f));
- let row2 = unsafe {
- validx.idx.byte_fields()
- };
+ validx.idx.seek(rowi as u64)?;
+ let row1 = row.iter();
+ validx.idx.read_byte_record(&mut scratch)?;
if right {
- try!(self.wtr.write_iter(row2.chain(row1)));
+ self.wtr.write_record(scratch.iter().chain(row1))?;
} else {
- try!(self.wtr.write_iter(row1.chain(row2)));
+ self.wtr.write_record(row1.chain(&scratch))?;
}
}
}
@@ -207,32 +206,28 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
}
fn full_outer_join(mut self) -> CliResult<()> {
- let (pad1, pad2) = try!(self.get_padding());
- let mut validx = try!(ValueIndex::new(self.rdr2, &self.sel2,
- self.casei, self.nulls));
+ let mut scratch = csv::ByteRecord::new();
+ let (pad1, pad2) = self.get_padding()?;
+ let mut validx = ValueIndex::new(
+ self.rdr2, &self.sel2, self.casei, self.nulls)?;
// Keep track of which rows we've written from rdr2.
let mut rdr2_written: Vec<_> =
repeat(false).take(validx.num_rows).collect();
for row1 in self.rdr1.byte_records() {
- let row1 = try!(row1);
- let key = get_row_key(&self.sel1, &*row1, self.casei);
+ let row1 = row1?;
+ let key = get_row_key(&self.sel1, &row1, self.casei);
match validx.values.get(&key) {
None => {
- let row1 = row1.iter().map(|f| Ok(&**f));
- let row2 = pad2.iter().map(|f| Ok(&**f));
- try!(self.wtr.write_iter(row1.chain(row2)));
+ self.wtr.write_record(row1.iter().chain(&pad2))?;
}
Some(rows) => {
for &rowi in rows.iter() {
rdr2_written[rowi] = true;
- try!(validx.idx.seek(rowi as u64));
- let row1 = row1.iter().map(|f| Ok(&**f));
- let row2 = unsafe {
- validx.idx.byte_fields()
- };
- try!(self.wtr.write_iter(row1.chain(row2)));
+ validx.idx.seek(rowi as u64)?;
+ validx.idx.read_byte_record(&mut scratch)?;
+ self.wtr.write_record(row1.iter().chain(&scratch))?;
}
}
}
@@ -242,46 +237,41 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
// from rdr1.
for (i, &written) in rdr2_written.iter().enumerate() {
if !written {
- try!(validx.idx.seek(i as u64));
- let row1 = pad1.iter().map(|f| Ok(&**f));
- let row2 = unsafe {
- validx.idx.byte_fields()
- };
- try!(self.wtr.write_iter(row1.chain(row2)));
+ validx.idx.seek(i as u64)?;
+ validx.idx.read_byte_record(&mut scratch)?;
+ self.wtr.write_record(pad1.iter().chain(&scratch))?;
}
}
Ok(())
}
fn cross_join(mut self) -> CliResult<()> {
+ let mut pos = csv::Position::new();
+ pos.set_byte(0);
+ let mut row2 = csv::ByteRecord::new();
for row1 in self.rdr1.byte_records() {
- let row1 = try!(row1);
-
- try!(self.rdr2.seek(0));
- let mut first = true;
- while !self.rdr2.done() {
- // Skip the header row. The raw byte interface won't
- // do it for us.
- if first && !self.no_headers {
- while let Some(f) =
- self.rdr2.next_bytes().into_iter_result() { try!(f); }
- first = false;
- }
- let row1 = row1.iter().map(|f| Ok(&**f));
- let row2 = unsafe { self.rdr2.byte_fields() };
- try!(self.wtr.write_iter(row1.chain(row2)));
+ let row1 = row1?;
+ self.rdr2.seek(pos.clone())?;
+ if self.rdr2.has_headers() {
+ // Read and skip the header row, since CSV readers disable
+ // the header skipping logic after being seeked.
+ self.rdr2.read_byte_record(&mut row2)?;
+ }
+ while self.rdr2.read_byte_record(&mut row2)? {
+ self.wtr.write_record(row1.iter().chain(&row2))?;
}
}
Ok(())
}
- fn get_padding(&mut self)
- -> CliResult<(Vec<ByteString>, Vec<ByteString>)> {
- let len1 = try!(self.rdr1.byte_headers()).len();
- let len2 = try!(self.rdr2.byte_headers()).len();
+ fn get_padding(
+ &mut self,
+ ) -> CliResult<(csv::ByteRecord, csv::ByteRecord)> {
+ let len1 = self.rdr1.byte_headers()?.len();
+ let len2 = self.rdr2.byte_headers()?.len();
Ok((
- repeat(util::empty_field()).take(len1).collect(),
- repeat(util::empty_field()).take(len2).collect(),
+ repeat(b"").take(len1).collect(),
+ repeat(b"").take(len2).collect(),
))
}
}
@@ -290,20 +280,20 @@ impl Args {
fn new_io_state(&self)
-> CliResult<IoState<fs::File, Box<io::Write+'static>>> {
let rconf1 = Config::new(&Some(self.arg_input1.clone()))
- .delimiter(self.flag_delimiter)
- .no_headers(self.flag_no_headers)
- .select(self.arg_columns1.clone());
+ .delimiter(self.flag_delimiter)
+ .no_headers(self.flag_no_headers)
+ .select(self.arg_columns1.clone());
let rconf2 = Config::new(&Some(self.arg_input2.clone()))
- .delimiter(self.flag_delimiter)
- .no_headers(self.flag_no_headers)
- .select(self.arg_columns2.clone());
-
- let mut rdr1 = try!(rconf1.reader_file());
- let mut rdr2 = try!(rconf2.reader_file());
- let (sel1, sel2) = try!(self.get_selections(&rconf1, &mut rdr1,
- &rconf2, &mut rdr2));
+ .delimiter(self.flag_delimiter)
+ .no_headers(self.flag_no_headers)
+ .select(self.arg_columns2.clone());
+
+ let mut rdr1 = rconf1.reader_file()?;
+ let mut rdr2 = rconf2.reader_file()?;
+ let (sel1, sel2) = self.get_selections(
+ &rconf1, &mut rdr1, &rconf2, &mut rdr2)?;
Ok(IoState {
- wtr: try!(Config::new(&self.flag_output).writer()),
+ wtr: Config::new(&self.flag_output).writer()?,
rdr1: rdr1,
sel1: sel1,
rdr2: rdr2,
@@ -314,15 +304,15 @@ impl Args {
})
}
- fn get_selections<R: io::Read>
- (&self,
- rconf1: &Config, rdr1: &mut csv::Reader<R>,
- rconf2: &Config, rdr2: &mut csv::Reader<R>)
- -> CliResult<(Selection, Selection)> {
- let headers1 = try!(rdr1.byte_headers());
- let headers2 = try!(rdr2.byte_headers());
- let select1 = try!(rconf1.selection(&*headers1));
- let select2 = try!(rconf2.selection(&*headers2));
+ fn get_selections<R: io::Read>(
+ &self,
+ rconf1: &Config, rdr1: &mut csv::Reader<R>,
+ rconf2: &Config, rdr2: &mut csv::Reader<R>,
+ ) -> CliResult<(Selection, Selection)> {
+ let headers1 = rdr1.byte_headers()?;
+ let headers2 = rdr2.byte_headers()?;
+ let select1 = rconf1.selection(&*headers1)?;
+ let select2 = rconf2.selection(&*headers2)?;
if select1.len() != select2.len() {
return fail!(format!(
"Column selections must have the same number of columns, \
@@ -341,40 +331,44 @@ struct ValueIndex<R> {
}
impl<R: io::Read + io::Seek> ValueIndex<R> {
- fn new(mut rdr: csv::Reader<R>, sel: &Selection,
- casei: bool, nulls: bool)
- -> CliResult<ValueIndex<R>> {
+ fn new(
+ mut rdr: csv::Reader<R>,
+ sel: &Selection,
+ casei: bool,
+ nulls: bool,
+ ) -> CliResult<ValueIndex<R>> {
let mut val_idx = HashMap::with_capacity(10000);
let mut row_idx = io::Cursor::new(Vec::with_capacity(8 * 10000));
let (mut rowi, mut count) = (0usize, 0usize);
- let row_len = try!(rdr.byte_headers()).len();
// This logic is kind of tricky. Basically, we want to include
// the header row in the line index (because that's what csv::index
// does), but we don't want to include header values in the ValueIndex.
- if !rdr.has_headers {
+ if !rdr.has_headers() {
// ... so if there are no headers, we seek to the beginning and
// index everything.
- try!(rdr.seek(0));
+ let mut pos = csv::Position::new();
+ pos.set_byte(0);
+ rdr.seek(pos)?;
} else {
// ... and if there are headers, we make sure that we've parsed
// them, and write the offset of the header row to the index.
- try!(rdr.byte_headers());
- try!(row_idx.write_u64::<BigEndian>(0));
+ rdr.byte_headers()?;
+ row_idx.write_u64::<BigEndian>(0)?;
count += 1;
}
- while !rdr.done() {
- // This is a bit hokey. We're doing this manually instead of
- // calling `csv::index::create` so we can create both indexes
- // in one pass.
- try!(row_idx.write_u64::<BigEndian>(rdr.byte_offset()));
-
- let mut row = Vec::with_capacity(row_len);
- while let Some(r) = rdr.next_bytes().into_iter_result() {
- row.push(try!(r).to_vec());
- }
- let fields: Vec<_> = sel.select(&row).map(|v| transform(v, casei)).collect();
+ let mut row = csv::ByteRecord::new();
+ while rdr.read_byte_record(&mut row)? {
+ // This is a bit hokey. We're doing this manually instead of using
+ // the `csv-index` crate directly so that we can create both
+ // indexes in one pass.
+ row_idx.write_u64::<BigEndian>(row.position().unwrap().byte())?;
+
+ let fields: Vec<_> = sel
+ .select(&row)
+ .map(|v| transform(v, casei))
+ .collect();
if nulls || !fields.iter().any(|f| f.is_empty()) {
match val_idx.entry(fields) {
Entry::Vacant(v) => {
@@ -382,17 +376,20 @@ impl<R: io::Read + io::Seek> ValueIndex<R> {
rows.push(rowi);
v.insert(rows);
}
- Entry::Occupied(mut v) => { v.get_mut().push(rowi); }
+ Entry::Occupied(mut v) => {
+ v.get_mut().push(rowi);
+ }
}
}
rowi += 1;
count += 1;
}
- try!(row_idx.write_u64::<BigEndian>(count as u64));
+
+ row_idx.write_u64::<BigEndian>(count as u64)?;
+ let idx = Indexed::open(rdr, io::Cursor::new(row_idx.into_inner()))?;
Ok(ValueIndex {
values: val_idx,
- idx: try!(Indexed::open(rdr,
- io::Cursor::new(row_idx.into_inner()))),
+ idx: idx,
num_rows: rowi,
})
}
@@ -408,14 +405,17 @@ impl<R> fmt::Debug for ValueIndex<R> {
let keys = keys.iter()
.map(|k| String::from_utf8(k.to_vec()).unwrap())
.collect::<Vec<_>>();
- try!(writeln!(f, "({}) => {:?}", keys.connect(", "), rows))
+ writeln!(f, "({}) => {:?}", keys.join(", "), rows)?
}
Ok(())
}
}
-fn get_row_key(sel: &Selection, row: &[ByteString], casei: bool)
- -> Vec<ByteString> {
+fn get_row_key(
+ sel: &Selection,
+ row: &csv::ByteRecord,
+ casei: bool,
+) -> Vec<ByteString> {
sel.select(row).map(|v| transform(&v, casei)).collect()
}