diff options
Diffstat (limited to 'run-tests')
-rwxr-xr-x | run-tests | 616 |
1 files changed, 325 insertions, 291 deletions
@@ -3,14 +3,16 @@ import unittest import pprint -from subprocess import PIPE,Popen,STDOUT -import sys,os +from subprocess import PIPE, Popen, STDOUT +import sys +import os import time from tempfile import NamedTemporaryFile + def run_command(cmd_to_run): - p = Popen(cmd_to_run,stdout=PIPE,stderr=PIPE,shell=True) - o,e = p.communicate() + p = Popen(cmd_to_run, stdout=PIPE, stderr=PIPE, shell=True) + o, e = p.communicate() # remove last newline o = o.strip() e = e.strip() @@ -23,7 +25,7 @@ def run_command(cmd_to_run): e = e.split(os.linesep) else: e = [] - return (p.returncode,o,e) + return (p.returncode, o, e) uneven_ls_output = """drwxr-xr-x 2 root root 4096 Jun 11 2012 /selinux drwxr-xr-x 2 root root 4096 Apr 19 2013 /mnt @@ -49,25 +51,31 @@ find_output = """8257537 32 drwxrwxrwt 218 root root 28672 Mar 1 1 """ header_row = 'name,value1,value2' -sample_data_rows = ['a,1,0','b,2,0','c,,0'] -sample_data_rows_with_empty_string = ['a,aaa,0','b,bbb,0','c,,0'] +sample_data_rows = ['a,1,0', 'b,2,0', 'c,,0'] +sample_data_rows_with_empty_string = ['a,aaa,0', 'b,bbb,0', 'c,,0'] sample_data_no_header = "\n".join(sample_data_rows) + "\n" -sample_data_with_empty_string_no_header = "\n".join(sample_data_rows_with_empty_string) + "\n" +sample_data_with_empty_string_no_header = "\n".join( + sample_data_rows_with_empty_string) + "\n" sample_data_with_header = header_row + "\n" + sample_data_no_header sample_data_with_missing_header_names = "name,value1\n" + sample_data_no_header -# Values with leading whitespace -sample_data_rows_with_spaces = ['a,1,0',' b, 2,0','c,,0'] -sample_data_with_spaces_no_header = "\n".join(sample_data_rows_with_spaces) + "\n" +# Values with leading whitespace +sample_data_rows_with_spaces = ['a,1,0', ' b, 2,0', 'c,,0'] +sample_data_with_spaces_no_header = "\n".join( + sample_data_rows_with_spaces) + "\n" header_row_with_spaces = 'name,value 1,value2' -sample_data_with_spaces_with_header = header_row_with_spaces + "\n" + sample_data_with_spaces_no_header +sample_data_with_spaces_with_header = header_row_with_spaces + \ + "\n" + sample_data_with_spaces_no_header + def one_column_warning(e): return e[0].startswith('Warning: column count is one') + class AbstractQTestCase(unittest.TestCase): - def create_file_with_data(self,data,encoding='utf-8'): + + def create_file_with_data(self, data, encoding='utf-8'): tmpfile = NamedTemporaryFile(delete=False) if encoding != 'none' and encoding is not None: tmpfile.write(data.encode(encoding)) @@ -76,32 +84,36 @@ class AbstractQTestCase(unittest.TestCase): tmpfile.close() return tmpfile - def cleanup(self,tmpfile): + def cleanup(self, tmpfile): os.remove(tmpfile.name) + class BasicTests(AbstractQTestCase): + def test_basic_aggregation(self): - retcode,o,e = run_command('seq 1 10 | ./q "select sum(c1),avg(c1) from -"') + retcode, o, e = run_command( + 'seq 1 10 | ./q "select sum(c1),avg(c1) from -"') self.assertTrue(retcode == 0) self.assertTrue(len(o) == 1) self.assertTrue(len(e) == 1) - s = sum(xrange(1,11)) - self.assertTrue(o[0] == '%s %s' % (s,s / 10.0)) + s = sum(xrange(1, 11)) + self.assertTrue(o[0] == '%s %s' % (s, s / 10.0)) self.assertTrue(one_column_warning(e)) def test_gzipped_file(self): - tmpfile = self.create_file_with_data('\x1f\x8b\x08\x08\xf2\x18\x12S\x00\x03xxxxxx\x003\xe42\xe22\xe62\xe12\xe52\xe32\xe7\xb2\xe0\xb2\xe424\xe0\x02\x00\xeb\xbf\x8a\x13\x15\x00\x00\x00',encoding='none') + tmpfile = self.create_file_with_data( + '\x1f\x8b\x08\x08\xf2\x18\x12S\x00\x03xxxxxx\x003\xe42\xe22\xe62\xe12\xe52\xe32\xe7\xb2\xe0\xb2\xe424\xe0\x02\x00\xeb\xbf\x8a\x13\x15\x00\x00\x00', encoding='none') cmd = './q -z "select sum(c1),avg(c1) from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) self.assertTrue(retcode == 0) self.assertTrue(len(o) == 1) self.assertTrue(len(e) == 1) - s = sum(xrange(1,11)) - self.assertTrue(o[0] == '%s %s' % (s,s / 10.0)) + s = sum(xrange(1, 11)) + self.assertTrue(o[0] == '%s %s' % (s, s / 10.0)) self.assertTrue(one_column_warning(e)) self.cleanup(tmpfile) @@ -110,142 +122,144 @@ class BasicTests(AbstractQTestCase): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d " " "select * from %s" -H' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertNotEquals(retcode,0) - self.assertEquals(len(o),0) - self.assertEquals(len(e),3) + self.assertNotEquals(retcode, 0) + self.assertEquals(len(o), 0) + self.assertEquals(len(e), 3) - self.assertTrue(e[0].startswith("Warning: column count is one - did you provide the correct delimiter")) + self.assertTrue(e[0].startswith( + "Warning: column count is one - did you provide the correct delimiter")) self.assertTrue(e[1].startswith("Bad header row")) self.assertTrue("Column name cannot contain commas" in e[2]) - def test_select_one_column(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , "select c1 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) - - self.assertEquals(retcode,0) - self.assertEquals(len(o),3) - self.assertEquals(len(e),0) + retcode, o, e = run_command(cmd) + + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 3) + self.assertEquals(len(e), 0) - self.assertEquals(" ".join(o),'a b c') + self.assertEquals(" ".join(o), 'a b c') self.cleanup(tmpfile) def test_tab_delimition_parameter(self): - tmpfile = self.create_file_with_data(sample_data_no_header.replace(",","\t")) + tmpfile = self.create_file_with_data( + sample_data_no_header.replace(",", "\t")) cmd = './q -t "select c1,c2,c3 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) - - self.assertEquals(retcode,0) - self.assertEquals(len(o),3) - self.assertEquals(len(e),0) - self.assertEquals(o[0],sample_data_rows[0].replace(",","\t")) - self.assertEquals(o[1],sample_data_rows[1].replace(",","\t")) - self.assertEquals(o[2],sample_data_rows[2].replace(",","\t")) + retcode, o, e = run_command(cmd) + + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 3) + self.assertEquals(len(e), 0) + self.assertEquals(o[0], sample_data_rows[0].replace(",", "\t")) + self.assertEquals(o[1], sample_data_rows[1].replace(",", "\t")) + self.assertEquals(o[2], sample_data_rows[2].replace(",", "\t")) self.cleanup(tmpfile) def test_tab_delimition_parameter__with_manual_override_attempt(self): - tmpfile = self.create_file_with_data(sample_data_no_header.replace(",","\t")) + tmpfile = self.create_file_with_data( + sample_data_no_header.replace(",", "\t")) cmd = './q -t -d , "select c1,c2,c3 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) - - self.assertEquals(retcode,0) - self.assertEquals(len(o),3) - self.assertEquals(len(e),0) - self.assertEquals(o[0],sample_data_rows[0].replace(",","\t")) - self.assertEquals(o[1],sample_data_rows[1].replace(",","\t")) - self.assertEquals(o[2],sample_data_rows[2].replace(",","\t")) + retcode, o, e = run_command(cmd) + + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 3) + self.assertEquals(len(e), 0) + self.assertEquals(o[0], sample_data_rows[0].replace(",", "\t")) + self.assertEquals(o[1], sample_data_rows[1].replace(",", "\t")) + self.assertEquals(o[2], sample_data_rows[2].replace(",", "\t")) self.cleanup(tmpfile) def test_output_delimiter(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , -D "|" "select c1,c2,c3 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),3) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 3) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],sample_data_rows[0].replace(",","|")) - self.assertEquals(o[1],sample_data_rows[1].replace(",","|")) - self.assertEquals(o[2],sample_data_rows[2].replace(",","|")) + self.assertEquals(o[0], sample_data_rows[0].replace(",", "|")) + self.assertEquals(o[1], sample_data_rows[1].replace(",", "|")) + self.assertEquals(o[2], sample_data_rows[2].replace(",", "|")) self.cleanup(tmpfile) def test_output_delimiter_tab_parameter(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , -T "select c1,c2,c3 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),3) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 3) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],sample_data_rows[0].replace(",","\t")) - self.assertEquals(o[1],sample_data_rows[1].replace(",","\t")) - self.assertEquals(o[2],sample_data_rows[2].replace(",","\t")) + self.assertEquals(o[0], sample_data_rows[0].replace(",", "\t")) + self.assertEquals(o[1], sample_data_rows[1].replace(",", "\t")) + self.assertEquals(o[2], sample_data_rows[2].replace(",", "\t")) self.cleanup(tmpfile) def test_output_delimiter_tab_parameter__with_manual_override_attempt(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , -T -D "|" "select c1,c2,c3 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),3) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 3) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],sample_data_rows[0].replace(",","\t")) - self.assertEquals(o[1],sample_data_rows[1].replace(",","\t")) - self.assertEquals(o[2],sample_data_rows[2].replace(",","\t")) + self.assertEquals(o[0], sample_data_rows[0].replace(",", "\t")) + self.assertEquals(o[1], sample_data_rows[1].replace(",", "\t")) + self.assertEquals(o[2], sample_data_rows[2].replace(",", "\t")) self.cleanup(tmpfile) def test_stdin_input(self): cmd = 'printf "%s" | ./q -d , "select c1,c2,c3 from -"' % sample_data_no_header - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),3) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 3) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],sample_data_rows[0]) - self.assertEquals(o[1],sample_data_rows[1]) - self.assertEquals(o[2],sample_data_rows[2]) + self.assertEquals(o[0], sample_data_rows[0]) + self.assertEquals(o[1], sample_data_rows[1]) + self.assertEquals(o[2], sample_data_rows[2]) def test_column_separation(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , "select c1,c2,c3 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),3) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 3) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],sample_data_rows[0]) - self.assertEquals(o[1],sample_data_rows[1]) - self.assertEquals(o[2],sample_data_rows[2]) + self.assertEquals(o[0], sample_data_rows[0]) + self.assertEquals(o[1], sample_data_rows[1]) + self.assertEquals(o[2], sample_data_rows[2]) self.cleanup(tmpfile) - + def test_column_analysis(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , "select c1 from %s" -A' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(o[0],'Table for file: %s' % tmpfile.name) - self.assertEquals(o[1].strip(),'`c1` - text') - self.assertEquals(o[2].strip(),'`c2` - int') - self.assertEquals(o[3].strip(),'`c3` - int') + self.assertEquals(retcode, 0) + self.assertEquals(o[0], 'Table for file: %s' % tmpfile.name) + self.assertEquals(o[1].strip(), '`c1` - text') + self.assertEquals(o[2].strip(), '`c2` - int') + self.assertEquals(o[3].strip(), '`c3` - int') self.cleanup(tmpfile) @@ -253,23 +267,24 @@ class BasicTests(AbstractQTestCase): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , "select c1 from %s" -A' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(o[0],'Table for file: %s' % tmpfile.name) - self.assertEquals(o[1].strip(),'`c1` - text') - self.assertEquals(o[2].strip(),'`c2` - int') - self.assertEquals(o[3].strip(),'`c3` - int') + self.assertEquals(retcode, 0) + self.assertEquals(o[0], 'Table for file: %s' % tmpfile.name) + self.assertEquals(o[1].strip(), '`c1` - text') + self.assertEquals(o[2].strip(), '`c2` - int') + self.assertEquals(o[3].strip(), '`c3` - int') def test_header_exception_on_numeric_header_data(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , "select * from %s" -A -H' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertNotEquals(retcode,0) - self.assertEquals(len(o),0) - self.assertEquals(len(e),3) - self.assertTrue('Bad header row: Header must contain only strings' in e[0]) + self.assertNotEquals(retcode, 0) + self.assertEquals(len(o), 0) + self.assertEquals(len(e), 3) + self.assertTrue( + 'Bad header row: Header must contain only strings' in e[0]) self.assertTrue("Column name must be a string" in e[1]) self.assertTrue("Column name must be a string" in e[2]) @@ -278,66 +293,68 @@ class BasicTests(AbstractQTestCase): def test_column_analysis_with_header(self): tmpfile = self.create_file_with_data(sample_data_with_header) cmd = './q -d , "select c1 from %s" -A -H' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(o[0],'Table for file: %s' % tmpfile.name) - self.assertEquals(o[1].strip(),'`name` - text') - self.assertEquals(o[2].strip(),'`value1` - int') - self.assertEquals(o[3].strip(),'`value2` - int') + self.assertEquals(retcode, 0) + self.assertEquals(o[0], 'Table for file: %s' % tmpfile.name) + self.assertEquals(o[1].strip(), '`name` - text') + self.assertEquals(o[2].strip(), '`value1` - int') + self.assertEquals(o[3].strip(), '`value2` - int') self.cleanup(tmpfile) def test_data_with_header(self): tmpfile = self.create_file_with_data(sample_data_with_header) cmd = './q -d , "select name from %s" -H' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),3) - self.assertEquals(" ".join(o),"a b c") + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 3) + self.assertEquals(" ".join(o), "a b c") self.cleanup(tmpfile) def test_generated_column_name_warning_when_header_line_exists(self): tmpfile = self.create_file_with_data(sample_data_with_header) cmd = './q -d , "select c3 from %s" -H' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertNotEquals(retcode,0) - self.assertEquals(len(o),0) - self.assertEquals(len(e),2) + self.assertNotEquals(retcode, 0) + self.assertEquals(len(o), 0) + self.assertEquals(len(e), 2) self.assertTrue('no such column: c3' in e[0]) - self.assertEquals(e[1],'Warning - There seems to be a "no such column" error, and -H (header line) exists. Please make sure that you are using the column names from the header line and not the default (cXX) column names') + self.assertEquals( + e[1], 'Warning - There seems to be a "no such column" error, and -H (header line) exists. Please make sure that you are using the column names from the header line and not the default (cXX) column names') self.cleanup(tmpfile) def test_column_analysis_with_unexpected_header(self): tmpfile = self.create_file_with_data(sample_data_with_header) cmd = './q -d , "select c1 from %s" -A' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),4) - self.assertEquals(len(e),1) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 4) + self.assertEquals(len(e), 1) - self.assertEquals(o[0],'Table for file: %s' % tmpfile.name) - self.assertEquals(o[1].strip(),'`c1` - text') - self.assertEquals(o[2].strip(),'`c2` - text') - self.assertEquals(o[3].strip(),'`c3` - text') + self.assertEquals(o[0], 'Table for file: %s' % tmpfile.name) + self.assertEquals(o[1].strip(), '`c1` - text') + self.assertEquals(o[2].strip(), '`c2` - text') + self.assertEquals(o[3].strip(), '`c3` - text') - self.assertEquals(e[0],'Warning - There seems to be header line in the file, but -H has not been specified. All fields will be detected as text fields, and the header line will appear as part of the data') + self.assertEquals( + e[0], 'Warning - There seems to be header line in the file, but -H has not been specified. All fields will be detected as text fields, and the header line will appear as part of the data') self.cleanup(tmpfile) def test_empty_data(self): tmpfile = self.create_file_with_data('') cmd = './q -d , "select c1 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),0) - self.assertEquals(len(e),1) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 0) + self.assertEquals(len(e), 1) self.assertTrue('Warning - data is empty' in e[0]) @@ -346,11 +363,11 @@ class BasicTests(AbstractQTestCase): def test_empty_data_with_header_param(self): tmpfile = self.create_file_with_data('') cmd = './q -d , "select c1 from %s" -H' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),0) - self.assertEquals(len(e),1) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 0) + self.assertEquals(len(e), 1) self.assertTrue('Warning - data is empty' in e[0]) @@ -359,24 +376,24 @@ class BasicTests(AbstractQTestCase): def test_one_row_of_data_without_header_param(self): tmpfile = self.create_file_with_data(header_row) cmd = './q -d , "select c2 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),1) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 1) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],'value1') + self.assertEquals(o[0], 'value1') self.cleanup(tmpfile) def test_one_row_of_data_with_header_param(self): tmpfile = self.create_file_with_data(header_row) cmd = './q -d , "select c2 from %s" -H' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),0) - self.assertEquals(len(e),1) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 0) + self.assertEquals(len(e), 1) self.assertTrue('Warning - data is empty' in e[0]) @@ -385,79 +402,83 @@ class BasicTests(AbstractQTestCase): def test_dont_leading_keep_whitespace_in_values(self): tmpfile = self.create_file_with_data(sample_data_with_spaces_no_header) cmd = './q -d , "select c1 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(e),0) - self.assertEquals(len(o),3) + self.assertEquals(retcode, 0) + self.assertEquals(len(e), 0) + self.assertEquals(len(o), 3) - self.assertEquals(o[0],'a') - self.assertEquals(o[1],'b') - self.assertEquals(o[2],'c') + self.assertEquals(o[0], 'a') + self.assertEquals(o[1], 'b') + self.assertEquals(o[2], 'c') def test_keep_leading_whitespace_in_values(self): tmpfile = self.create_file_with_data(sample_data_with_spaces_no_header) cmd = './q -d , "select c1 from %s" -k' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(e),0) - self.assertEquals(len(o),3) + self.assertEquals(retcode, 0) + self.assertEquals(len(e), 0) + self.assertEquals(len(o), 3) - self.assertEquals(o[0],'a') - self.assertEquals(o[1],' b') - self.assertEquals(o[2],'c') + self.assertEquals(o[0], 'a') + self.assertEquals(o[1], ' b') + self.assertEquals(o[2], 'c') def test_no_impact_of_keeping_leading_whitespace_on_integers(self): tmpfile = self.create_file_with_data(sample_data_with_spaces_no_header) cmd = './q -d , "select c2 from %s" -k -A' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(e),0) - self.assertEquals(len(o),4) + self.assertEquals(retcode, 0) + self.assertEquals(len(e), 0) + self.assertEquals(len(o), 4) - self.assertEquals(o[0],'Table for file: %s' % tmpfile.name) - self.assertEquals(o[1].strip(),'`c1` - text') - self.assertEquals(o[2].strip(),'`c2` - int') - self.assertEquals(o[3].strip(),'`c3` - int') + self.assertEquals(o[0], 'Table for file: %s' % tmpfile.name) + self.assertEquals(o[1].strip(), '`c1` - text') + self.assertEquals(o[2].strip(), '`c2` - int') + self.assertEquals(o[3].strip(), '`c3` - int') def test_spaces_in_header_row(self): - tmpfile = self.create_file_with_data(header_row_with_spaces + "\n" + sample_data_no_header) + tmpfile = self.create_file_with_data( + header_row_with_spaces + "\n" + sample_data_no_header) cmd = './q -d , "select name,\`value 1\` from %s" -H' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(e),0) - self.assertEquals(len(o),3) + self.assertEquals(retcode, 0) + self.assertEquals(len(e), 0) + self.assertEquals(len(o), 3) - self.assertEquals(o[0],'a,1') - self.assertEquals(o[1],'b,2') - self.assertEquals(o[2],'c,') + self.assertEquals(o[0], 'a,1') + self.assertEquals(o[1], 'b,2') + self.assertEquals(o[2], 'c,') def test_column_analysis_for_spaces_in_header_row(self): - tmpfile = self.create_file_with_data(header_row_with_spaces + "\n" + sample_data_no_header) + tmpfile = self.create_file_with_data( + header_row_with_spaces + "\n" + sample_data_no_header) cmd = './q -d , "select name,\`value 1\` from %s" -H -A' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) + + self.assertEquals(retcode, 0) + self.assertEquals(len(e), 0) + self.assertEquals(len(o), 4) - self.assertEquals(retcode,0) - self.assertEquals(len(e),0) - self.assertEquals(len(o),4) + self.assertEquals(o[0], 'Table for file: %s' % tmpfile.name) + self.assertEquals(o[1].strip(), '`name` - text') + self.assertEquals(o[2].strip(), '`value 1` - int') + self.assertEquals(o[3].strip(), '`value2` - int') - self.assertEquals(o[0],'Table for file: %s' % tmpfile.name) - self.assertEquals(o[1].strip(),'`name` - text') - self.assertEquals(o[2].strip(),'`value 1` - int') - self.assertEquals(o[3].strip(),'`value2` - int') class ParsingModeTests(AbstractQTestCase): + def test_strict_mode_column_count_mismatch_error(self): tmpfile = self.create_file_with_data(uneven_ls_output) cmd = './q -m strict "select count(*) from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertNotEquals(retcode,0) - self.assertEquals(len(o),0) - self.assertEquals(len(e),1) + self.assertNotEquals(retcode, 0) + self.assertEquals(len(o), 0) + self.assertEquals(len(e), 1) self.assertTrue("Column Count is expected to identical" in e[0]) @@ -466,235 +487,248 @@ class ParsingModeTests(AbstractQTestCase): def test_strict_mode_too_large_specific_column_count(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , -m strict -c 4 "select count(*) from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertNotEquals(retcode,0) - self.assertEquals(len(o),0) - self.assertEquals(len(e),1) + self.assertNotEquals(retcode, 0) + self.assertEquals(len(o), 0) + self.assertEquals(len(e), 1) - self.assertEquals(e[0],"Strict mode. Column count is expected to be 4 but is 3") + self.assertEquals( + e[0], "Strict mode. Column count is expected to be 4 but is 3") self.cleanup(tmpfile) def test_strict_mode_too_small_specific_column_count(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , -m strict -c 2 "select count(*) from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertNotEquals(retcode,0) - self.assertEquals(len(o),0) - self.assertEquals(len(e),1) + self.assertNotEquals(retcode, 0) + self.assertEquals(len(o), 0) + self.assertEquals(len(e), 1) - self.assertEquals(e[0],"Strict mode. Column count is expected to be 2 but is 3") + self.assertEquals( + e[0], "Strict mode. Column count is expected to be 2 but is 3") self.cleanup(tmpfile) def test_relaxed_mode_missing_columns_in_header(self): - tmpfile = self.create_file_with_data(sample_data_with_missing_header_names) + tmpfile = self.create_file_with_data( + sample_data_with_missing_header_names) cmd = './q -d , -m relaxed "select count(*) from %s" -H -A' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),4) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 4) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],'Table for file: %s' % tmpfile.name) - self.assertEquals(o[1].strip(),'`name` - text') - self.assertEquals(o[2].strip(),'`value1` - int') - self.assertEquals(o[3].strip(),'`c3` - int') + self.assertEquals(o[0], 'Table for file: %s' % tmpfile.name) + self.assertEquals(o[1].strip(), '`name` - text') + self.assertEquals(o[2].strip(), '`value1` - int') + self.assertEquals(o[3].strip(), '`c3` - int') self.cleanup(tmpfile) def test_strict_mode_missing_columns_in_header(self): - tmpfile = self.create_file_with_data(sample_data_with_missing_header_names) + tmpfile = self.create_file_with_data( + sample_data_with_missing_header_names) cmd = './q -d , -m strict "select count(*) from %s" -H -A' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertNotEquals(retcode,0) - self.assertEquals(len(o),0) - self.assertEquals(len(e),1) + self.assertNotEquals(retcode, 0) + self.assertEquals(len(o), 0) + self.assertEquals(len(e), 1) - self.assertEquals(e[0],'Strict mode. Header row contains less columns than expected column count(2 vs 3)') + self.assertEquals( + e[0], 'Strict mode. Header row contains less columns than expected column count(2 vs 3)') self.cleanup(tmpfile) def test_output_delimiter_with_missing_fields(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , "select * from %s" -D ";"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),3) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 3) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],'a;1;0') - self.assertEquals(o[1],'b;2;0') - self.assertEquals(o[2],'c;;0') + self.assertEquals(o[0], 'a;1;0') + self.assertEquals(o[1], 'b;2;0') + self.assertEquals(o[2], 'c;;0') self.cleanup(tmpfile) def test_handling_of_null_integers(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , "select avg(c2) from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),1) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 1) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],'1.5') + self.assertEquals(o[0], '1.5') self.cleanup(tmpfile) def test_empty_integer_values_converted_to_null(self): tmpfile = self.create_file_with_data(sample_data_no_header) cmd = './q -d , "select * from %s where c2 is null"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),1) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 1) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],'c,,0') + self.assertEquals(o[0], 'c,,0') self.cleanup(tmpfile) def test_empty_string_values_not_converted_to_null(self): - tmpfile = self.create_file_with_data(sample_data_with_empty_string_no_header) - cmd = './q -d , "select * from %s where c2 == %s"' % (tmpfile.name,"''") - retcode,o,e = run_command(cmd) + tmpfile = self.create_file_with_data( + sample_data_with_empty_string_no_header) + cmd = './q -d , "select * from %s where c2 == %s"' % ( + tmpfile.name, "''") + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),1) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 1) + self.assertEquals(len(e), 0) - self.assertEquals(o[0],'c,,0') + self.assertEquals(o[0], 'c,,0') self.cleanup(tmpfile) - + def test_relaxed_mode_detected_columns(self): tmpfile = self.create_file_with_data(uneven_ls_output) cmd = './q -m relaxed "select count(*) from %s" -A' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(e), 0) table_name_row = o[0] column_rows = o[1:] - self.assertEquals(len(column_rows),11) - + self.assertEquals(len(column_rows), 11) + column_tuples = [x.strip().split(" ") for x in column_rows] - column_info = [(x[0],x[2]) for x in column_tuples] + column_info = [(x[0], x[2]) for x in column_tuples] column_names = [x[0] for x in column_tuples] column_types = [x[2] for x in column_tuples] - self.assertEquals(column_names,['`c%s`' % x for x in xrange(1,12)]) - self.assertEquals(column_types,['text','int','text','text','int','text','int','int','text','text','text']) - + self.assertEquals(column_names, ['`c%s`' % x for x in xrange(1, 12)]) + self.assertEquals(column_types, [ + 'text', 'int', 'text', 'text', 'int', 'text', 'int', 'int', 'text', 'text', 'text']) + self.cleanup(tmpfile) def test_relaxed_mode_detected_columns_with_specific_column_count(self): tmpfile = self.create_file_with_data(uneven_ls_output) cmd = './q -m relaxed "select count(*) from %s" -A -c 9' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(e), 0) table_name_row = o[0] column_rows = o[1:] - self.assertEquals(len(column_rows),9) - + self.assertEquals(len(column_rows), 9) + column_tuples = [x.strip().split(" ") for x in column_rows] - column_info = [(x[0],x[2]) for x in column_tuples] + column_info = [(x[0], x[2]) for x in column_tuples] column_names = [x[0] for x in column_tuples] column_types = [x[2] for x in column_tuples] - self.assertEquals(column_names,['`c%s`' % x for x in xrange(1,10)]) - self.assertEquals(column_types,['text','int','text','text','int','text','int','int','text']) - + self.assertEquals(column_names, ['`c%s`' % x for x in xrange(1, 10)]) + self.assertEquals( + column_types, ['text', 'int', 'text', 'text', 'int', 'text', 'int', 'int', 'text']) + self.cleanup(tmpfile) def test_relaxed_mode_last_column_data_with_specific_column_count(self): tmpfile = self.create_file_with_data(uneven_ls_output) cmd = './q -m relaxed "select c9 from %s" -c 9' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) + + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 9) + self.assertEquals(len(e), 0) - self.assertEquals(retcode,0) - self.assertEquals(len(o),9) - self.assertEquals(len(e),0) + expected_output = ["/selinux", "/mnt", "/srv", "/lost+found", "/initrd.img.old -> /boot/initrd.img-3.8.0-19-generic", + "/cdrom", "/home", "/vmlinuz -> boot/vmlinuz-3.8.0-19-generic", "/initrd.img -> boot/initrd.img-3.8.0-19-generic"] - expected_output = ["/selinux","/mnt","/srv","/lost+found","/initrd.img.old -> /boot/initrd.img-3.8.0-19-generic","/cdrom","/home","/vmlinuz -> boot/vmlinuz-3.8.0-19-generic","/initrd.img -> boot/initrd.img-3.8.0-19-generic"] + self.assertEquals(o, expected_output) - self.assertEquals(o,expected_output) - self.cleanup(tmpfile) def test_fluffy_mode(self): tmpfile = self.create_file_with_data(uneven_ls_output) cmd = './q -m fluffy "select c9 from %s"' % tmpfile.name - retcode,o,e = run_command(cmd) + retcode, o, e = run_command(cmd) - self.assertEquals(retcode,0) - self.assertEquals(len(o),9) - self.assertEquals(len(e),0) + self.assertEquals(retcode, 0) + self.assertEquals(len(o), 9) + self.assertEquals(len(e), 0) - expected_output = ["/selinux","/mnt","/srv","/lost+found","/initrd.img.old","/cdrom","/home","/vmlinuz","/initrd.img"] + expected_output = ["/selinux", "/mnt", "/srv", "/lost+found", + "/initrd.img.old", "/cdrom", "/home", "/vmlinuz", "/initrd.img"] + + self.assertEquals(o, expected_output) - self.assertEquals(o,expected_output) - self.cleanup(tmpfile) + |