use std::io::{mod, File};
use std::os;
use csv::{mod, ByteString};
use csv::index::Indexed;
use stats::{Frequencies, merge_all};
use CliResult;
use config::{Config, Delimiter};
use select::{SelectColumns, Selection};
use util;
static USAGE: &'static str = "
Compute a frequency table on CSV data.
The frequency table is formatted as CSV data:
field,value,count
By default, there is a row for the N most frequent values for each field in the
data. The order and number of values can be tweaked with --asc and --limit,
respectively.
Since this computes an exact frequency table, memory proportional to the
cardinality of each column is required.
Usage:
xsv frequency [options] []
frequency options:
-s, --select Select a subset of columns to compute frequencies
for. See 'xsv select --help' for the format
details. This is provided here because piping 'xsv
select' into 'xsv frequency' will disable the use
of indexing.
-l, --limit Limit the frequency table to the N most common
items. Set to '0' to disable a limit.
[default: 10]
-a, --asc Sort the frequency tables in ascending order by
count. The default is descending order.
--no-nulls Don't include NULLs in the frequency table.
-j, --jobs The number of jobs to run in parallel.
This works better when the given CSV data has
an index already created. Note that a file handle
is opened for each job.
When set to '0', the number of jobs is set to the
number of CPUs detected.
[default: 0]
Common options:
-h, --help Display this message
-o, --output Write output to instead of stdout.
-n, --no-headers When set, the first row will NOT be included
in the frequency table. Additionally, the 'field'
column will be 1-based indices instead of header
names.
-d, --delimiter The field delimiter for reading CSV data.
Must be a single character. (default: ,)
";
#[deriving(Clone, Decodable)]
struct Args {
arg_input: Option,
flag_select: SelectColumns,
flag_limit: uint,
flag_asc: bool,
flag_no_nulls: bool,
flag_jobs: uint,
flag_output: Option,
flag_no_headers: bool,
flag_delimiter: Option,
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = try!(util::get_args(USAGE, argv));
let rconfig = args.rconfig();
let mut wtr = try!(Config::new(&args.flag_output).writer());
let (headers, tables) = try!(match try!(args.rconfig().indexed()) {
Some(ref mut idx) if args.njobs() > 1 => args.parallel_ftables(idx),
_ => args.sequential_ftables(),
});
try!(wtr.write(vec!["field", "value", "count"].into_iter()));
let head_ftables = headers.into_iter().zip(tables.into_iter());
for (i, (mut header, ftab)) in head_ftables.enumerate() {
if rconfig.no_headers {
header = ByteString::from_bytes((i+1).to_string());
}
for (value, count) in args.counts(&ftab).into_iter() {
let count = count.to_string();
let row = vec![header[], value[], count.as_bytes()];
try!(wtr.write_bytes(row.into_iter()));
}
}
Ok(())
}
type ByteRow = Vec;
type Headers = ByteRow;
type FTable = Frequencies;
type FTables = Vec>;
impl Args {
fn rconfig(&self) -> Config {
Config::new(&self.arg_input)
.delimiter(self.flag_delimiter)
.no_headers(self.flag_no_headers)
.select(self.flag_select.clone())
}
fn counts(&self, ftab: &FTable) -> Vec<(ByteString, u64)> {
let mut counts = if self.flag_asc {
ftab.least_frequent()
} else {
ftab.most_frequent()
};
if self.flag_limit > 0 {
counts = counts.into_iter().take(self.flag_limit).collect();
}
counts.into_iter().map(|(bs, c)| {
if b"" == bs.as_slice() {
(ByteString::from_bytes(b"(NULL)"), c)
} else {
(bs.clone(), c)
}
}).collect()
}
fn sequential_ftables(&self) -> CliResult<(Headers, FTables)> {
let mut rdr = try!(self.rconfig().reader());
let (headers, sel) = try!(self.sel_headers(&mut rdr));
Ok((headers, try!(self.ftables(&sel, rdr.byte_records()))))
}
fn parallel_ftables(&self, idx: &mut Indexed)
-> CliResult<(Headers, FTables)> {
use std::comm::channel;
use std::sync::TaskPool;
let mut rdr = try!(self.rconfig().reader());
let (headers, sel) = try!(self.sel_headers(&mut rdr));
if idx.count() == 0 {
return Ok((headers, vec![]));
}
let chunk_size = util::chunk_size(idx.count() as uint, self.njobs());
let nchunks = util::num_of_chunks(idx.count() as uint, chunk_size);
let pool = TaskPool::new(self.njobs());
let (send, recv) = channel();
for i in range(0, nchunks) {
let (send, args, sel) = (send.clone(), self.clone(), sel.clone());
pool.execute(move || {
let mut idx = args.rconfig().indexed().unwrap().unwrap();
idx.seek((i * chunk_size) as u64).unwrap();
let it = idx.csv().byte_records().take(chunk_size);
send.send(args.ftables(&sel, it).unwrap());
});
}
drop(send);
Ok((headers, merge_all(recv.iter()).unwrap()))
}
fn ftables>>
(&self, sel: &Selection, mut it: I)
-> CliResult {
let null = ByteString::from_bytes(b"");
let nsel = sel.normal();
let mut tabs = Vec::from_fn(nsel.len(), |_| Frequencies::new());
for row in it {
let row = try!(row);
for (i, field) in nsel.select(row.into_iter()).enumerate() {
if !field.is_empty() {
tabs[i].add(trim(field));
} else {
if !self.flag_no_nulls {
tabs[i].add(null.clone());
}
}
}
}
Ok(tabs)
}
fn sel_headers(&self, rdr: &mut csv::Reader)
-> CliResult<(ByteRow, Selection)> {
let headers = try!(rdr.byte_headers());
let sel = try!(self.rconfig().selection(headers[]));
Ok((sel.select(headers[]).map(ByteString::from_bytes).collect(), sel))
}
fn njobs(&self) -> uint {
if self.flag_jobs == 0 { os::num_cpus() } else { self.flag_jobs }
}
}
fn trim(bs: ByteString) -> ByteString {
match bs.into_utf8_string() {
Ok(s) => ByteString::from_bytes(s.trim()),
Err(bs) => bs,
}
}