use std::io::{mod, File}; use std::os; use csv::{mod, ByteString}; use csv::index::Indexed; use stats::{Frequencies, merge_all}; use CliResult; use config::{Config, Delimiter}; use select::{SelectColumns, Selection}; use util; static USAGE: &'static str = " Compute a frequency table on CSV data. The frequency table is formatted as CSV data: field,value,count By default, there is a row for the N most frequent values for each field in the data. The order and number of values can be tweaked with --asc and --limit, respectively. Since this computes an exact frequency table, memory proportional to the cardinality of each column is required. Usage: xsv frequency [options] [] frequency options: -s, --select Select a subset of columns to compute frequencies for. See 'xsv select --help' for the format details. This is provided here because piping 'xsv select' into 'xsv frequency' will disable the use of indexing. -l, --limit Limit the frequency table to the N most common items. Set to '0' to disable a limit. [default: 10] -a, --asc Sort the frequency tables in ascending order by count. The default is descending order. --no-nulls Don't include NULLs in the frequency table. -j, --jobs The number of jobs to run in parallel. This works better when the given CSV data has an index already created. Note that a file handle is opened for each job. When set to '0', the number of jobs is set to the number of CPUs detected. [default: 0] Common options: -h, --help Display this message -o, --output Write output to instead of stdout. -n, --no-headers When set, the first row will NOT be included in the frequency table. Additionally, the 'field' column will be 1-based indices instead of header names. -d, --delimiter The field delimiter for reading CSV data. Must be a single character. (default: ,) "; #[deriving(Clone, Decodable)] struct Args { arg_input: Option, flag_select: SelectColumns, flag_limit: uint, flag_asc: bool, flag_no_nulls: bool, flag_jobs: uint, flag_output: Option, flag_no_headers: bool, flag_delimiter: Option, } pub fn run(argv: &[&str]) -> CliResult<()> { let args: Args = try!(util::get_args(USAGE, argv)); let rconfig = args.rconfig(); let mut wtr = try!(Config::new(&args.flag_output).writer()); let (headers, tables) = try!(match try!(args.rconfig().indexed()) { Some(ref mut idx) if args.njobs() > 1 => args.parallel_ftables(idx), _ => args.sequential_ftables(), }); try!(wtr.write(vec!["field", "value", "count"].into_iter())); let head_ftables = headers.into_iter().zip(tables.into_iter()); for (i, (mut header, ftab)) in head_ftables.enumerate() { if rconfig.no_headers { header = ByteString::from_bytes((i+1).to_string()); } for (value, count) in args.counts(&ftab).into_iter() { let count = count.to_string(); let row = vec![header[], value[], count.as_bytes()]; try!(wtr.write_bytes(row.into_iter())); } } Ok(()) } type ByteRow = Vec; type Headers = ByteRow; type FTable = Frequencies; type FTables = Vec>; impl Args { fn rconfig(&self) -> Config { Config::new(&self.arg_input) .delimiter(self.flag_delimiter) .no_headers(self.flag_no_headers) .select(self.flag_select.clone()) } fn counts(&self, ftab: &FTable) -> Vec<(ByteString, u64)> { let mut counts = if self.flag_asc { ftab.least_frequent() } else { ftab.most_frequent() }; if self.flag_limit > 0 { counts = counts.into_iter().take(self.flag_limit).collect(); } counts.into_iter().map(|(bs, c)| { if b"" == bs.as_slice() { (ByteString::from_bytes(b"(NULL)"), c) } else { (bs.clone(), c) } }).collect() } fn sequential_ftables(&self) -> CliResult<(Headers, FTables)> { let mut rdr = try!(self.rconfig().reader()); let (headers, sel) = try!(self.sel_headers(&mut rdr)); Ok((headers, try!(self.ftables(&sel, rdr.byte_records())))) } fn parallel_ftables(&self, idx: &mut Indexed) -> CliResult<(Headers, FTables)> { use std::comm::channel; use std::sync::TaskPool; let mut rdr = try!(self.rconfig().reader()); let (headers, sel) = try!(self.sel_headers(&mut rdr)); if idx.count() == 0 { return Ok((headers, vec![])); } let chunk_size = util::chunk_size(idx.count() as uint, self.njobs()); let nchunks = util::num_of_chunks(idx.count() as uint, chunk_size); let pool = TaskPool::new(self.njobs()); let (send, recv) = channel(); for i in range(0, nchunks) { let (send, args, sel) = (send.clone(), self.clone(), sel.clone()); pool.execute(move || { let mut idx = args.rconfig().indexed().unwrap().unwrap(); idx.seek((i * chunk_size) as u64).unwrap(); let it = idx.csv().byte_records().take(chunk_size); send.send(args.ftables(&sel, it).unwrap()); }); } drop(send); Ok((headers, merge_all(recv.iter()).unwrap())) } fn ftables>> (&self, sel: &Selection, mut it: I) -> CliResult { let null = ByteString::from_bytes(b""); let nsel = sel.normal(); let mut tabs = Vec::from_fn(nsel.len(), |_| Frequencies::new()); for row in it { let row = try!(row); for (i, field) in nsel.select(row.into_iter()).enumerate() { if !field.is_empty() { tabs[i].add(trim(field)); } else { if !self.flag_no_nulls { tabs[i].add(null.clone()); } } } } Ok(tabs) } fn sel_headers(&self, rdr: &mut csv::Reader) -> CliResult<(ByteRow, Selection)> { let headers = try!(rdr.byte_headers()); let sel = try!(self.rconfig().selection(headers[])); Ok((sel.select(headers[]).map(ByteString::from_bytes).collect(), sel)) } fn njobs(&self) -> uint { if self.flag_jobs == 0 { os::num_cpus() } else { self.flag_jobs } } } fn trim(bs: ByteString) -> ByteString { match bs.into_utf8_string() { Ok(s) => ByteString::from_bytes(s.trim()), Err(bs) => bs, } }