diff options
author | Bastian Venthur <bastian.venthur@flixbus.com> | 2018-10-15 11:56:39 +0200 |
---|---|---|
committer | Brian May <brian@linuxpenguins.xyz> | 2018-10-17 20:52:04 +1100 |
commit | 842768f9cf6c3ce6638c1b741ae8f8b2a79ae66a (patch) | |
tree | ad0abfa3a03aa3038b061723ba9c03e857663104 /tests | |
parent | 97ed2030f340113908fa218f90964767415c55ab (diff) |
Moved sshuttle/tests into tests to.
Having the tests in a `tests` directory in root is the most common
approach. Also moved pytest's conftest.py into `tests` making the
fixture available for client and server tests.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/client/test_firewall.py | 136 | ||||
-rw-r--r-- | tests/client/test_helpers.py | 201 | ||||
-rw-r--r-- | tests/client/test_methods_nat.py | 162 | ||||
-rw-r--r-- | tests/client/test_methods_pf.py | 487 | ||||
-rw-r--r-- | tests/client/test_methods_tproxy.py | 297 | ||||
-rw-r--r-- | tests/client/test_options.py | 98 | ||||
-rw-r--r-- | tests/client/test_sdnotify.py | 64 | ||||
-rw-r--r-- | tests/conftest.py | 10 | ||||
-rw-r--r-- | tests/server/test_server.py | 58 |
9 files changed, 1513 insertions, 0 deletions
diff --git a/tests/client/test_firewall.py b/tests/client/test_firewall.py new file mode 100644 index 0000000..3174e9c --- /dev/null +++ b/tests/client/test_firewall.py @@ -0,0 +1,136 @@ +from mock import Mock, patch, call +import io +from socket import AF_INET, AF_INET6 + +import sshuttle.firewall + + +def setup_daemon(): + stdin = io.StringIO(u"""ROUTES +{inet},24,0,1.2.3.0,8000,9000 +{inet},32,1,1.2.3.66,8080,8080 +{inet6},64,0,2404:6800:4004:80c::,0,0 +{inet6},128,1,2404:6800:4004:80c::101f,80,80 +NSLIST +{inet},1.2.3.33 +{inet6},2404:6800:4004:80c::33 +PORTS 1024,1025,1026,1027 +GO 1 - +HOST 1.2.3.3,existing +""".format(inet=AF_INET, inet6=AF_INET6)) + stdout = Mock() + return stdin, stdout + + +def test_rewrite_etc_hosts(tmpdir): + orig_hosts = tmpdir.join("hosts.orig") + orig_hosts.write("1.2.3.3 existing\n") + + new_hosts = tmpdir.join("hosts") + orig_hosts.copy(new_hosts) + + hostmap = { + 'myhost': '1.2.3.4', + 'myotherhost': '1.2.3.5', + } + with patch('sshuttle.firewall.HOSTSFILE', new=str(new_hosts)): + sshuttle.firewall.rewrite_etc_hosts(hostmap, 10) + + with new_hosts.open() as f: + line = f.readline() + s = line.split() + assert s == ['1.2.3.3', 'existing'] + + line = f.readline() + s = line.split() + assert s == ['1.2.3.4', 'myhost', + '#', 'sshuttle-firewall-10', 'AUTOCREATED'] + + line = f.readline() + s = line.split() + assert s == ['1.2.3.5', 'myotherhost', + '#', 'sshuttle-firewall-10', 'AUTOCREATED'] + + line = f.readline() + assert line == "" + + with patch('sshuttle.firewall.HOSTSFILE', new=str(new_hosts)): + sshuttle.firewall.restore_etc_hosts(10) + assert orig_hosts.computehash() == new_hosts.computehash() + + +def test_subnet_weight(): + subnets = [ + (AF_INET, 16, 0, '192.168.0.0', 0, 0), + (AF_INET, 24, 0, '192.168.69.0', 0, 0), + (AF_INET, 32, 0, '192.168.69.70', 0, 0), + (AF_INET, 32, 1, '192.168.69.70', 0, 0), + (AF_INET, 32, 1, '192.168.69.70', 80, 80), + (AF_INET, 0, 1, '0.0.0.0', 0, 0), + (AF_INET, 0, 1, '0.0.0.0', 8000, 9000), + (AF_INET, 0, 1, '0.0.0.0', 8000, 8500), + (AF_INET, 0, 1, '0.0.0.0', 8000, 8000), + (AF_INET, 0, 1, '0.0.0.0', 400, 450) + ] + subnets_sorted = [ + (AF_INET, 32, 1, '192.168.69.70', 80, 80), + (AF_INET, 0, 1, '0.0.0.0', 8000, 8000), + (AF_INET, 0, 1, '0.0.0.0', 400, 450), + (AF_INET, 0, 1, '0.0.0.0', 8000, 8500), + (AF_INET, 0, 1, '0.0.0.0', 8000, 9000), + (AF_INET, 32, 1, '192.168.69.70', 0, 0), + (AF_INET, 32, 0, '192.168.69.70', 0, 0), + (AF_INET, 24, 0, '192.168.69.0', 0, 0), + (AF_INET, 16, 0, '192.168.0.0', 0, 0), + (AF_INET, 0, 1, '0.0.0.0', 0, 0) + ] + + assert subnets_sorted == \ + sorted(subnets, key=sshuttle.firewall.subnet_weight, reverse=True) + + +@patch('sshuttle.firewall.rewrite_etc_hosts') +@patch('sshuttle.firewall.setup_daemon') +@patch('sshuttle.firewall.get_method') +def test_main(mock_get_method, mock_setup_daemon, mock_rewrite_etc_hosts): + stdin, stdout = setup_daemon() + mock_setup_daemon.return_value = stdin, stdout + + mock_get_method("not_auto").name = "test" + mock_get_method.reset_mock() + + sshuttle.firewall.main("not_auto", False) + + assert mock_rewrite_etc_hosts.mock_calls == [ + call({'1.2.3.3': 'existing'}, 1024), + call({}, 1024), + ] + + assert stdout.mock_calls == [ + call.write('READY test\n'), + call.flush(), + call.write('STARTED\n'), + call.flush() + ] + assert mock_setup_daemon.mock_calls == [call()] + assert mock_get_method.mock_calls == [ + call('not_auto'), + call().setup_firewall( + 1024, 1026, + [(AF_INET6, u'2404:6800:4004:80c::33')], + AF_INET6, + [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 0, 0), + (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 80, 80)], + True, + None), + call().setup_firewall( + 1025, 1027, + [(AF_INET, u'1.2.3.33')], + AF_INET, + [(AF_INET, 24, False, u'1.2.3.0', 8000, 9000), + (AF_INET, 32, True, u'1.2.3.66', 8080, 8080)], + True, + None), + call().restore_firewall(1024, AF_INET6, True, None), + call().restore_firewall(1025, AF_INET, True, None), + ] diff --git a/tests/client/test_helpers.py b/tests/client/test_helpers.py new file mode 100644 index 0000000..7359ec2 --- /dev/null +++ b/tests/client/test_helpers.py @@ -0,0 +1,201 @@ +from mock import patch, call +import sys +import io +import socket +from socket import AF_INET, AF_INET6 +import errno + +import sshuttle.helpers + + +@patch('sshuttle.helpers.logprefix', new='prefix: ') +@patch('sshuttle.helpers.sys.stdout') +@patch('sshuttle.helpers.sys.stderr') +def test_log(mock_stderr, mock_stdout): + sshuttle.helpers.log("message") + sshuttle.helpers.log("abc") + sshuttle.helpers.log("message 1\n") + sshuttle.helpers.log("message 2\nline2\nline3\n") + sshuttle.helpers.log("message 3\nline2\nline3") + assert mock_stdout.mock_calls == [ + call.flush(), + call.flush(), + call.flush(), + call.flush(), + call.flush(), + ] + assert mock_stderr.mock_calls == [ + call.write('prefix: message'), + call.flush(), + call.write('prefix: abc'), + call.flush(), + call.write('prefix: message 1\n'), + call.flush(), + call.write('prefix: message 2\n'), + call.write('---> line2\n'), + call.write('---> line3\n'), + call.flush(), + call.write('prefix: message 3\n'), + call.write('---> line2\n'), + call.write('---> line3\n'), + call.flush(), + ] + + +@patch('sshuttle.helpers.logprefix', new='prefix: ') +@patch('sshuttle.helpers.verbose', new=1) +@patch('sshuttle.helpers.sys.stdout') +@patch('sshuttle.helpers.sys.stderr') +def test_debug1(mock_stderr, mock_stdout): + sshuttle.helpers.debug1("message") + assert mock_stdout.mock_calls == [ + call.flush(), + ] + assert mock_stderr.mock_calls == [ + call.write('prefix: message'), + call.flush(), + ] + + +@patch('sshuttle.helpers.logprefix', new='prefix: ') +@patch('sshuttle.helpers.verbose', new=0) +@patch('sshuttle.helpers.sys.stdout') +@patch('sshuttle.helpers.sys.stderr') +def test_debug1_nop(mock_stderr, mock_stdout): + sshuttle.helpers.debug1("message") + assert mock_stdout.mock_calls == [] + assert mock_stderr.mock_calls == [] + + +@patch('sshuttle.helpers.logprefix', new='prefix: ') +@patch('sshuttle.helpers.verbose', new=2) +@patch('sshuttle.helpers.sys.stdout') +@patch('sshuttle.helpers.sys.stderr') +def test_debug2(mock_stderr, mock_stdout): + sshuttle.helpers.debug2("message") + assert mock_stdout.mock_calls == [ + call.flush(), + ] + assert mock_stderr.mock_calls == [ + call.write('prefix: message'), + call.flush(), + ] + + +@patch('sshuttle.helpers.logprefix', new='prefix: ') +@patch('sshuttle.helpers.verbose', new=1) +@patch('sshuttle.helpers.sys.stdout') +@patch('sshuttle.helpers.sys.stderr') +def test_debug2_nop(mock_stderr, mock_stdout): + sshuttle.helpers.debug2("message") + assert mock_stdout.mock_calls == [] + assert mock_stderr.mock_calls == [] + + +@patch('sshuttle.helpers.logprefix', new='prefix: ') +@patch('sshuttle.helpers.verbose', new=3) +@patch('sshuttle.helpers.sys.stdout') +@patch('sshuttle.helpers.sys.stderr') +def test_debug3(mock_stderr, mock_stdout): + sshuttle.helpers.debug3("message") + assert mock_stdout.mock_calls == [ + call.flush(), + ] + assert mock_stderr.mock_calls == [ + call.write('prefix: message'), + call.flush(), + ] + + +@patch('sshuttle.helpers.logprefix', new='prefix: ') +@patch('sshuttle.helpers.verbose', new=2) +@patch('sshuttle.helpers.sys.stdout') +@patch('sshuttle.helpers.sys.stderr') +def test_debug3_nop(mock_stderr, mock_stdout): + sshuttle.helpers.debug3("message") + assert mock_stdout.mock_calls == [] + assert mock_stderr.mock_calls == [] + + +@patch('sshuttle.helpers.open', create=True) +def test_resolvconf_nameservers(mock_open): + mock_open.return_value = io.StringIO(u""" +# Generated by NetworkManager +search pri +nameserver 192.168.1.1 +nameserver 192.168.2.1 +nameserver 192.168.3.1 +nameserver 192.168.4.1 +nameserver 2404:6800:4004:80c::1 +nameserver 2404:6800:4004:80c::2 +nameserver 2404:6800:4004:80c::3 +nameserver 2404:6800:4004:80c::4 +""") + + ns = sshuttle.helpers.resolvconf_nameservers() + assert ns == [ + (AF_INET, u'192.168.1.1'), (AF_INET, u'192.168.2.1'), + (AF_INET, u'192.168.3.1'), (AF_INET, u'192.168.4.1'), + (AF_INET6, u'2404:6800:4004:80c::1'), + (AF_INET6, u'2404:6800:4004:80c::2'), + (AF_INET6, u'2404:6800:4004:80c::3'), + (AF_INET6, u'2404:6800:4004:80c::4') + ] + + +@patch('sshuttle.helpers.open', create=True) +def test_resolvconf_random_nameserver(mock_open): + mock_open.return_value = io.StringIO(u""" +# Generated by NetworkManager +search pri +nameserver 192.168.1.1 +nameserver 192.168.2.1 +nameserver 192.168.3.1 +nameserver 192.168.4.1 +nameserver 2404:6800:4004:80c::1 +nameserver 2404:6800:4004:80c::2 +nameserver 2404:6800:4004:80c::3 +nameserver 2404:6800:4004:80c::4 +""") + ns = sshuttle.helpers.resolvconf_random_nameserver() + assert ns in [ + (AF_INET, u'192.168.1.1'), (AF_INET, u'192.168.2.1'), + (AF_INET, u'192.168.3.1'), (AF_INET, u'192.168.4.1'), + (AF_INET6, u'2404:6800:4004:80c::1'), + (AF_INET6, u'2404:6800:4004:80c::2'), + (AF_INET6, u'2404:6800:4004:80c::3'), + (AF_INET6, u'2404:6800:4004:80c::4') + ] + + +@patch('sshuttle.helpers.socket.socket.bind') +def test_islocal(mock_bind): + bind_error = socket.error(errno.EADDRNOTAVAIL) + mock_bind.side_effect = [None, bind_error, None, bind_error] + + assert sshuttle.helpers.islocal("127.0.0.1", AF_INET) + assert not sshuttle.helpers.islocal("192.0.2.1", AF_INET) + assert sshuttle.helpers.islocal("::1", AF_INET6) + assert not sshuttle.helpers.islocal("2001:db8::1", AF_INET6) + + +def test_family_ip_tuple(): + assert sshuttle.helpers.family_ip_tuple("127.0.0.1") \ + == (AF_INET, "127.0.0.1") + assert sshuttle.helpers.family_ip_tuple("192.168.2.6") \ + == (AF_INET, "192.168.2.6") + assert sshuttle.helpers.family_ip_tuple("::1") \ + == (AF_INET6, "::1") + assert sshuttle.helpers.family_ip_tuple("2404:6800:4004:80c::1") \ + == (AF_INET6, "2404:6800:4004:80c::1") + + +def test_family_to_string(): + assert sshuttle.helpers.family_to_string(AF_INET) == "AF_INET" + assert sshuttle.helpers.family_to_string(AF_INET6) == "AF_INET6" + if sys.version_info < (3, 0): + expected = "1" + assert sshuttle.helpers.family_to_string(socket.AF_UNIX) == "1" + else: + expected = 'AddressFamily.AF_UNIX' + assert sshuttle.helpers.family_to_string(socket.AF_UNIX) == expected diff --git a/tests/client/test_methods_nat.py b/tests/client/test_methods_nat.py new file mode 100644 index 0000000..b18c05e --- /dev/null +++ b/tests/client/test_methods_nat.py @@ -0,0 +1,162 @@ +import pytest +from mock import Mock, patch, call +import socket +from socket import AF_INET, AF_INET6 +import struct + +from sshuttle.helpers import Fatal +from sshuttle.methods import get_method + + +def test_get_supported_features(): + method = get_method('nat') + features = method.get_supported_features() + assert not features.ipv6 + assert not features.udp + assert features.dns + + +def test_get_tcp_dstip(): + sock = Mock() + sock.getsockopt.return_value = struct.pack( + '!HHBBBB', socket.ntohs(AF_INET), 1024, 127, 0, 0, 1) + method = get_method('nat') + assert method.get_tcp_dstip(sock) == ('127.0.0.1', 1024) + assert sock.mock_calls == [call.getsockopt(0, 80, 16)] + + +def test_recv_udp(): + sock = Mock() + sock.recvfrom.return_value = "11111", "127.0.0.1" + method = get_method('nat') + result = method.recv_udp(sock, 1024) + assert sock.mock_calls == [call.recvfrom(1024)] + assert result == ("127.0.0.1", None, "11111") + + +def test_send_udp(): + sock = Mock() + method = get_method('nat') + method.send_udp(sock, None, "127.0.0.1", "22222") + assert sock.mock_calls == [call.sendto("22222", "127.0.0.1")] + + +def test_setup_tcp_listener(): + listener = Mock() + method = get_method('nat') + method.setup_tcp_listener(listener) + assert listener.mock_calls == [] + + +def test_setup_udp_listener(): + listener = Mock() + method = get_method('nat') + method.setup_udp_listener(listener) + assert listener.mock_calls == [] + + +def test_assert_features(): + method = get_method('nat') + features = method.get_supported_features() + method.assert_features(features) + + features.udp = True + with pytest.raises(Fatal): + method.assert_features(features) + + features.ipv6 = True + with pytest.raises(Fatal): + method.assert_features(features) + + +def test_firewall_command(): + method = get_method('nat') + assert not method.firewall_command("somthing") + + +@patch('sshuttle.methods.nat.ipt') +@patch('sshuttle.methods.nat.ipt_ttl') +@patch('sshuttle.methods.nat.ipt_chain_exists') +def test_setup_firewall(mock_ipt_chain_exists, mock_ipt_ttl, mock_ipt): + mock_ipt_chain_exists.return_value = True + method = get_method('nat') + assert method.name == 'nat' + + with pytest.raises(Exception) as excinfo: + method.setup_firewall( + 1024, 1026, + [(AF_INET6, u'2404:6800:4004:80c::33')], + AF_INET6, + [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 0, 0), + (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 80, 80)], + True, + None) + assert str(excinfo.value) \ + == 'Address family "AF_INET6" unsupported by nat method_name' + assert mock_ipt_chain_exists.mock_calls == [] + assert mock_ipt_ttl.mock_calls == [] + assert mock_ipt.mock_calls == [] + + with pytest.raises(Exception) as excinfo: + method.setup_firewall( + 1025, 1027, + [(AF_INET, u'1.2.3.33')], + AF_INET, + [(AF_INET, 24, False, u'1.2.3.0', 8000, 9000), + (AF_INET, 32, True, u'1.2.3.66', 8080, 8080)], + True, + None) + assert str(excinfo.value) == 'UDP not supported by nat method_name' + assert mock_ipt_chain_exists.mock_calls == [] + assert mock_ipt_ttl.mock_calls == [] + assert mock_ipt.mock_calls == [] + + method.setup_firewall( + 1025, 1027, + [(AF_INET, u'1.2.3.33')], + AF_INET, + [(AF_INET, 24, False, u'1.2.3.0', 8000, 9000), + (AF_INET, 32, True, u'1.2.3.66', 8080, 8080)], + False, + None) + assert mock_ipt_chain_exists.mock_calls == [ + call(AF_INET, 'nat', 'sshuttle-1025') + ] + assert mock_ipt_ttl.mock_calls == [ + call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'REDIRECT', + '--dest', u'1.2.3.0/24', '-p', 'tcp', '--dport', '8000:9000', + '--to-ports', '1025'), + call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'REDIRECT', + '--dest', u'1.2.3.33/32', '-p', 'udp', + '--dport', '53', '--to-ports', '1027') + ] + assert mock_ipt.mock_calls == [ + call(AF_INET, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1025'), + call(AF_INET, 'nat', '-D', 'PREROUTING', '-j', 'sshuttle-1025'), + call(AF_INET, 'nat', '-F', 'sshuttle-1025'), + call(AF_INET, 'nat', '-X', 'sshuttle-1025'), + call(AF_INET, 'nat', '-N', 'sshuttle-1025'), + call(AF_INET, 'nat', '-F', 'sshuttle-1025'), + call(AF_INET, 'nat', '-I', 'OUTPUT', '1', '-j', 'sshuttle-1025'), + call(AF_INET, 'nat', '-I', 'PREROUTING', '1', '-j', 'sshuttle-1025'), + call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'RETURN', + '--dest', u'1.2.3.66/32', '-p', 'tcp', '--dport', '8080:8080') + ] + mock_ipt_chain_exists.reset_mock() + mock_ipt_ttl.reset_mock() + mock_ipt.reset_mock() + + method.restore_firewall(1025, AF_INET, False, None) + assert mock_ipt_chain_exists.mock_calls == [ + call(AF_INET, 'nat', 'sshuttle-1025') + ] + assert mock_ipt_ttl.mock_calls == [] + assert mock_ipt.mock_calls == [ + call(AF_INET, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1025'), + call(AF_INET, 'nat', '-D', 'PREROUTING', '-j', 'sshuttle-1025'), + call(AF_INET, 'nat', '-F', 'sshuttle-1025'), + call(AF_INET, 'nat', '-X', 'sshuttle-1025') + ] + mock_ipt_chain_exists.reset_mock() + mock_ipt_ttl.reset_mock() + mock_ipt.reset_mock() diff --git a/tests/client/test_methods_pf.py b/tests/client/test_methods_pf.py new file mode 100644 index 0000000..e965c62 --- /dev/null +++ b/tests/client/test_methods_pf.py @@ -0,0 +1,487 @@ +import pytest +from mock import Mock, patch, call, ANY +import socket +from socket import AF_INET, AF_INET6 + +from sshuttle.methods import get_method +from sshuttle.helpers import Fatal +from sshuttle.methods.pf import FreeBsd, Darwin, OpenBsd + + +def test_get_supported_features(): + method = get_method('pf') + features = method.get_supported_features() + assert features.ipv6 + assert not features.udp + assert features.dns + + +@patch('sshuttle.helpers.verbose', new=3) +def test_get_tcp_dstip(): + sock = Mock() + sock.getpeername.return_value = ("127.0.0.1", 1024) + sock.getsockname.return_value = ("127.0.0.2", 1025) + sock.family = AF_INET + + firewall = Mock() + firewall.pfile.readline.return_value = \ + b"QUERY_PF_NAT_SUCCESS 127.0.0.3,1026\n" + + method = get_method('pf') + method.set_firewall(firewall) + assert method.get_tcp_dstip(sock) == ('127.0.0.3', 1026) + + assert sock.mock_calls == [ + call.getpeername(), + call.getsockname(), + ] + assert firewall.mock_calls == [ + call.pfile.write(b'QUERY_PF_NAT 2,6,127.0.0.1,1024,127.0.0.2,1025\n'), + call.pfile.flush(), + call.pfile.readline() + ] + + +def test_recv_udp(): + sock = Mock() + sock.recvfrom.return_value = "11111", "127.0.0.1" + method = get_method('pf') + result = method.recv_udp(sock, 1024) + assert sock.mock_calls == [call.recvfrom(1024)] + assert result == ("127.0.0.1", None, "11111") + + +def test_send_udp(): + sock = Mock() + method = get_method('pf') + method.send_udp(sock, None, "127.0.0.1", "22222") + assert sock.mock_calls == [call.sendto("22222", "127.0.0.1")] + + +def test_setup_tcp_listener(): + listener = Mock() + method = get_method('pf') + method.setup_tcp_listener(listener) + assert listener.mock_calls == [] + + +def test_setup_udp_listener(): + listener = Mock() + method = get_method('pf') + method.setup_udp_listener(listener) + assert listener.mock_calls == [] + + +def test_assert_features(): + method = get_method('pf') + features = method.get_supported_features() + method.assert_features(features) + + features.udp = True + with pytest.raises(Fatal): + method.assert_features(features) + + features.ipv6 = True + with pytest.raises(Fatal): + method.assert_features(features) + + +@patch('sshuttle.methods.pf.pf', Darwin()) +@patch('sshuttle.methods.pf.sys.stdout') +@patch('sshuttle.methods.pf.ioctl') +@patch('sshuttle.methods.pf.pf_get_dev') +def test_firewall_command_darwin(mock_pf_get_dev, mock_ioctl, mock_stdout): + method = get_method('pf') + assert not method.firewall_command("somthing") + + command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % ( + AF_INET, socket.IPPROTO_TCP, + "127.0.0.1", 1025, "127.0.0.2", 1024) + assert method.firewall_command(command) + + assert mock_pf_get_dev.mock_calls == [call()] + assert mock_ioctl.mock_calls == [ + call(mock_pf_get_dev(), 0xc0544417, ANY), + ] + assert mock_stdout.mock_calls == [ + call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'), + call.flush(), + ] + + +@patch('sshuttle.methods.pf.pf', FreeBsd()) +@patch('sshuttle.methods.pf.sys.stdout') +@patch('sshuttle.methods.pf.ioctl') +@patch('sshuttle.methods.pf.pf_get_dev') +def test_firewall_command_freebsd(mock_pf_get_dev, mock_ioctl, mock_stdout): + method = get_method('pf') + assert not method.firewall_command("somthing") + + command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % ( + AF_INET, socket.IPPROTO_TCP, + "127.0.0.1", 1025, "127.0.0.2", 1024) + assert method.firewall_command(command) + + assert mock_pf_get_dev.mock_calls == [call()] + assert mock_ioctl.mock_calls == [ + call(mock_pf_get_dev(), 0xc04c4417, ANY), + ] + assert mock_stdout.mock_calls == [ + call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'), + call.flush(), + ] + + +@patch('sshuttle.methods.pf.pf', OpenBsd()) +@patch('sshuttle.methods.pf.sys.stdout') +@patch('sshuttle.methods.pf.ioctl') +@patch('sshuttle.methods.pf.pf_get_dev') +def test_firewall_command_openbsd(mock_pf_get_dev, mock_ioctl, mock_stdout): + method = get_method('pf') + assert not method.firewall_command("somthing") + + command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % ( + AF_INET, socket.IPPROTO_TCP, + "127.0.0.1", 1025, "127.0.0.2", 1024) + assert method.firewall_command(command) + + assert mock_pf_get_dev.mock_calls == [call()] + assert mock_ioctl.mock_calls == [ + call(mock_pf_get_dev(), 0xc0504417, ANY), + ] + assert mock_stdout.mock_calls == [ + call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'), + call.flush(), + ] + + +def pfctl(args, stdin=None): + if args == '-s Interfaces -i lo -v': + return (b'lo0 (skip)',) + if args == '-s all': + return (b'INFO:\nStatus: Disabled\nanother mary had a little lamb\n', + b'little lamb\n') + if args == '-E': + return (b'\n', b'Token : abcdefg\n') + return None + + +@patch('sshuttle.helpers.verbose', new=3) +@patch('sshuttle.methods.pf.pf', Darwin()) +@patch('sshuttle.methods.pf.pfctl') +@patch('sshuttle.methods.pf.ioctl') +@patch('sshuttle.methods.pf.pf_get_dev') +def test_setup_firewall_darwin(mock_pf_get_dev, mock_ioctl, mock_pfctl): + mock_pfctl.side_effect = pfctl + + method = get_method('pf') + assert method.name == 'pf' + + # IPV6 + + method.setup_firewall( + 1024, 1026, + [(AF_INET6, u'2404:6800:4004:80c::33')], + AF_INET6, + [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000), + (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)], + False, + None) + assert mock_ioctl.mock_calls == [ + call(mock_pf_get_dev(), 0xC4704433, ANY), + call(mock_pf_get_dev(), 0xCC20441A, ANY), + call(mock_pf_get_dev(), 0xCC20441A, ANY), + call(mock_pf_get_dev(), 0xC4704433, ANY), + call(mock_pf_get_dev(), 0xCC20441A, ANY), + call(mock_pf_get_dev(), 0xCC20441A, ANY), + ] + assert mock_pfctl.mock_calls == [ + call('-s Interfaces -i lo -v'), + call('-f /dev/stdin', b'pass on lo\n'), + call('-s all'), + call('-a sshuttle6-1024 -f /dev/stdin', + b'table <dns_servers> {2404:6800:4004:80c::33}\n' + b'rdr pass on lo0 inet6 proto tcp from ! ::1 to ' + b'2404:6800:4004:80c::/64 port 8000:9000 -> ::1 port 1024\n' + b'rdr pass on lo0 inet6 proto udp ' + b'to <dns_servers> port 53 -> ::1 port 1026\n' + b'pass out quick inet6 proto tcp to ' + b'2404:6800:4004:80c::101f/128 port 8080:8080\n' + b'pass out route-to lo0 inet6 proto tcp to ' + b'2404:6800:4004:80c::/64 port 8000:9000 keep state\n' + b'pass out route-to lo0 inet6 proto udp ' + b'to <dns_servers> port 53 keep state\n'), + call('-E'), + ] + mock_pf_get_dev.reset_mock() + mock_ioctl.reset_mock() + mock_pfctl.reset_mock() + + with pytest.raises(Exception) as excinfo: + method.setup_firewall( + 1025, 1027, + [(AF_INET, u'1.2.3.33')], + AF_INET, + [(AF_INET, 24, False, u'1.2.3.0', 0, 0), + (AF_INET, 32, True, u'1.2.3.66', 80, 80)], + True, + None) + assert str(excinfo.value) == 'UDP not supported by pf method_name' + assert mock_pf_get_dev.mock_calls == [] + assert mock_ioctl.mock_calls == [] + assert mock_pfctl.mock_calls == [] + + method.setup_firewall( + 1025, 1027, + [(AF_INET, u'1.2.3.33')], + AF_INET, + [(AF_INET, 24, False, u'1.2.3.0', 0, 0), + (AF_INET, 32, True, u'1.2.3.66', 80, 80)], + False, + None) + assert mock_ioctl.mock_calls == [ + call(mock_pf_get_dev(), 0xC4704433, ANY), + call(mock_pf_get_dev(), 0xCC20441A, ANY), + call(mock_pf_get_dev(), 0xCC20441A, ANY), + call(mock_pf_get_dev(), 0xC4704433, ANY), + call(mock_pf_get_dev(), 0xCC20441A, ANY), + call(mock_pf_get_dev(), 0xCC20441A, ANY), + ] + assert mock_pfctl.mock_calls == [ + call('-s Interfaces -i lo -v'), + call('-f /dev/stdin', b'pass on lo\n'), + call('-s all'), + call('-a sshuttle-1025 -f /dev/stdin', + b'table <dns_servers> {1.2.3.33}\n' + b'rdr pass on lo0 inet proto tcp from ! 127.0.0.1 to 1.2.3.0/24 ' + b'-> 127.0.0.1 port 1025\n' + b'rdr pass on lo0 inet proto udp ' + b'to <dns_servers> port 53 -> 127.0.0.1 port 1027\n' + b'pass out quick inet proto tcp to 1.2.3.66/32 port 80:80\n' + b'pass out route-to lo0 inet proto tcp to 1.2.3.0/24 keep state\n' + b'pass out route-to lo0 inet proto udp ' + b'to <dns_servers> port 53 keep state\n'), + call('-E'), + ] + mock_pf_get_dev.reset_mock() + mock_ioctl.reset_mock() + mock_pfctl.reset_mock() + + method.restore_firewall(1025, AF_INET, False, None) + assert mock_ioctl.mock_calls == [] + assert mock_pfctl.mock_calls == [ + call('-a sshuttle-1025 -F all'), + call("-X abcdefg"), + ] + mock_pf_get_dev.reset_mock() + mock_pfctl.reset_mock() + mock_ioctl.reset_mock() + + +@patch('sshuttle.helpers.verbose', new=3) +@patch('sshuttle.methods.pf.pf', FreeBsd()) +@patch('subprocess.call') +@patch('sshuttle.methods.pf.pfctl') +@patch('sshuttle.methods.pf.ioctl') +@patch('sshuttle.methods.pf.pf_get_dev') +def test_setup_firewall_freebsd(mock_pf_get_dev, mock_ioctl, mock_pfctl, + mock_subprocess_call): + mock_pfctl.side_effect = pfctl + + method = get_method('pf') + assert method.name == 'pf' + + method.setup_firewall( + 1024, 1026, + [(AF_INET6, u'2404:6800:4004:80c::33')], + AF_INET6, + [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000), + (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)], + False, + None) + + assert mock_pfctl.mock_calls == [ + call('-s all'), + call('-a sshuttle6-1024 -f /dev/stdin', + b'table <dns_servers> {2404:6800:4004:80c::33}\n' + b'rdr pass on lo0 inet6 proto tcp from ! ::1 to ' + b'2404:6800:4004:80c::/64 port 8000:9000 -> ::1 port 1024\n' + b'rdr pass on lo0 inet6 proto udp ' + b'to <dns_servers> port 53 -> ::1 port 1026\n' + b'pass out quick inet6 proto tcp to ' + b'2404:6800:4004:80c::101f/128 port 8080:8080\n' + b'pass out route-to lo0 inet6 proto tcp to ' + b'2404:6800:4004:80c::/64 port 8000:9000 keep state\n' + b'pass out route-to lo0 inet6 proto udp ' + b'to <dns_servers> port 53 keep state\n'), + call('-e'), + ] + assert call(['kldload', 'pf']) in mock_subprocess_call.mock_calls + mock_pf_get_dev.reset_mock() + mock_ioctl.reset_mock() + mock_pfctl.reset_mock() + + with pytest.raises(Exception) as excinfo: + method.setup_firewall( + 1025, 1027, + [(AF_INET, u'1.2.3.33')], + AF_INET, + [(AF_INET, 24, False, u'1.2.3.0', 0, 0), + (AF_INET, 32, True, u'1.2.3.66', 80, 80)], + True, + None) + assert str(excinfo.value) == 'UDP not supported by pf method_name' + assert mock_pf_get_dev.mock_calls == [] + assert mock_ioctl.mock_calls == [] + assert mock_pfctl.mock_calls == [] + + method.setup_firewall( + 1025, 1027, + [(AF_INET, u'1.2.3.33')], + AF_INET, + [(AF_INET, 24, False, u'1.2.3.0', 0, 0), + (AF_INET, 32, True, u'1.2.3.66', 80, 80)], + False, + None) + assert mock_ioctl.mock_calls == [ + call(mock_pf_get_dev(), 0xC4704433, ANY), + call(mock_pf_get_dev(), 0xCBE0441A, ANY), + call(mock_pf_get_dev(), 0xCBE0441A, ANY), + call(mock_pf_get_dev(), 0xC4704433, ANY), + call(mock_pf_get_dev(), 0xCBE0441A, ANY), + call(mock_pf_get_dev(), 0xCBE0441A, ANY), + ] + assert mock_pfctl.mock_calls == [ + call('-s all'), + call('-a sshuttle-1025 -f /dev/stdin', + b'table <dns_servers> {1.2.3.33}\n' + b'rdr pass on lo0 inet proto tcp from ! 127.0.0.1 ' + b'to 1.2.3.0/24 -> 127.0.0.1 port 1025\n' + b'rdr pass on lo0 inet proto udp ' + b'to <dns_servers> port 53 -> 127.0.0.1 port 1027\n' + b'pass out quick inet proto tcp to 1.2.3.66/32 port 80:80\n' + b'pass out route-to lo0 inet proto tcp to 1.2.3.0/24 keep state\n' + b'pass out route-to lo0 inet proto udp ' + b'to <dns_servers> port 53 keep state\n'), + call('-e'), + ] + mock_pf_get_dev.reset_mock() + mock_ioctl.reset_mock() + mock_pfctl.reset_mock() + + method.restore_firewall(1025, AF_INET, False, None) + method.restore_firewall(1024, AF_INET6, False, None) + assert mock_ioctl.mock_calls == [] + assert mock_pfctl.mock_calls == [ + call('-a sshuttle-1025 -F all'), + call('-a sshuttle6-1024 -F all'), + call("-d"), + ] + mock_pf_get_dev.reset_mock() + mock_pfctl.reset_mock() + mock_ioctl.reset_mock() + + +@patch('sshuttle.helpers.verbose', new=3) +@patch('sshuttle.methods.pf.pf', OpenBsd()) +@patch('sshuttle.methods.pf.pfctl') +@patch('sshuttle.methods.pf.ioctl') +@patch('sshuttle.methods.pf.pf_get_dev') +def test_setup_firewall_openbsd(mock_pf_get_dev, mock_ioctl, mock_pfctl): + mock_pfctl.side_effect = pfctl + + method = get_method('pf') + assert method.name == 'pf' + + method.setup_firewall( + 1024, 1026, + [(AF_INET6, u'2404:6800:4004:80c::33')], + AF_INET6, + [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000), + (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)], + False, + None) + + assert mock_ioctl.mock_calls == [ + call(mock_pf_get_dev(), 0xcd48441a, ANY), + call(mock_pf_get_dev(), 0xcd48441a, ANY), + ] + assert mock_pfctl.mock_calls == [ + call('-s Interfaces -i lo -v'), + call('-f /dev/stdin', b'match on lo\n'), + call('-s all'), + call('-a sshuttle6-1024 -f /dev/stdin', + b'table <dns_servers> {2404:6800:4004:80c::33}\n' + b'pass in on lo0 inet6 proto tcp to 2404:6800:4004:80c::/64 ' + b'port 8000:9000 divert-to ::1 port 1024\n' + b'pass in on lo0 inet6 proto udp ' + b'to <dns_servers> port 53 rdr-to ::1 port 1026\n' + b'pass out quick inet6 proto tcp to ' + b'2404:6800:4004:80c::101f/128 port 8080:8080\n' + b'pass out inet6 proto tcp to 2404:6800:4004:80c::/64 ' + b'port 8000:9000 route-to lo0 keep state\n' + b'pass out inet6 proto udp to ' + b'<dns_servers> port 53 route-to lo0 keep state\n'), + call('-e'), + ] + mock_pf_get_dev.reset_mock() + mock_ioctl.reset_mock() + mock_pfctl.reset_mock() + + with pytest.raises(Exception) as excinfo: + method.setup_firewall( + 1025, 1027, + [(AF_INET, u'1.2.3.33')], + AF_INET, + [(AF_INET, 24, False, u'1.2.3.0', 0, 0), + (AF_INET, 32, True, u'1.2.3.66', 80, 80)], + True, + None) + assert str(excinfo.value) == 'UDP not supported by pf method_name' + assert mock_pf_get_dev.mock_calls == [] + assert mock_ioctl.mock_calls == [] + assert mock_pfctl.mock_calls == [] + + method.setup_firewall( + 1025, 1027, + [(AF_INET, u'1.2.3.33')], + AF_INET, + [(AF_INET, 24, False, u'1.2.3.0', 0, 0), + (AF_IN |