summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorBastian Venthur <bastian.venthur@flixbus.com>2018-10-15 11:56:39 +0200
committerBrian May <brian@linuxpenguins.xyz>2018-10-17 20:52:04 +1100
commit842768f9cf6c3ce6638c1b741ae8f8b2a79ae66a (patch)
treead0abfa3a03aa3038b061723ba9c03e857663104 /tests
parent97ed2030f340113908fa218f90964767415c55ab (diff)
Moved sshuttle/tests into tests to.
Having the tests in a `tests` directory in root is the most common approach. Also moved pytest's conftest.py into `tests` making the fixture available for client and server tests.
Diffstat (limited to 'tests')
-rw-r--r--tests/client/test_firewall.py136
-rw-r--r--tests/client/test_helpers.py201
-rw-r--r--tests/client/test_methods_nat.py162
-rw-r--r--tests/client/test_methods_pf.py487
-rw-r--r--tests/client/test_methods_tproxy.py297
-rw-r--r--tests/client/test_options.py98
-rw-r--r--tests/client/test_sdnotify.py64
-rw-r--r--tests/conftest.py10
-rw-r--r--tests/server/test_server.py58
9 files changed, 1513 insertions, 0 deletions
diff --git a/tests/client/test_firewall.py b/tests/client/test_firewall.py
new file mode 100644
index 0000000..3174e9c
--- /dev/null
+++ b/tests/client/test_firewall.py
@@ -0,0 +1,136 @@
+from mock import Mock, patch, call
+import io
+from socket import AF_INET, AF_INET6
+
+import sshuttle.firewall
+
+
+def setup_daemon():
+ stdin = io.StringIO(u"""ROUTES
+{inet},24,0,1.2.3.0,8000,9000
+{inet},32,1,1.2.3.66,8080,8080
+{inet6},64,0,2404:6800:4004:80c::,0,0
+{inet6},128,1,2404:6800:4004:80c::101f,80,80
+NSLIST
+{inet},1.2.3.33
+{inet6},2404:6800:4004:80c::33
+PORTS 1024,1025,1026,1027
+GO 1 -
+HOST 1.2.3.3,existing
+""".format(inet=AF_INET, inet6=AF_INET6))
+ stdout = Mock()
+ return stdin, stdout
+
+
+def test_rewrite_etc_hosts(tmpdir):
+ orig_hosts = tmpdir.join("hosts.orig")
+ orig_hosts.write("1.2.3.3 existing\n")
+
+ new_hosts = tmpdir.join("hosts")
+ orig_hosts.copy(new_hosts)
+
+ hostmap = {
+ 'myhost': '1.2.3.4',
+ 'myotherhost': '1.2.3.5',
+ }
+ with patch('sshuttle.firewall.HOSTSFILE', new=str(new_hosts)):
+ sshuttle.firewall.rewrite_etc_hosts(hostmap, 10)
+
+ with new_hosts.open() as f:
+ line = f.readline()
+ s = line.split()
+ assert s == ['1.2.3.3', 'existing']
+
+ line = f.readline()
+ s = line.split()
+ assert s == ['1.2.3.4', 'myhost',
+ '#', 'sshuttle-firewall-10', 'AUTOCREATED']
+
+ line = f.readline()
+ s = line.split()
+ assert s == ['1.2.3.5', 'myotherhost',
+ '#', 'sshuttle-firewall-10', 'AUTOCREATED']
+
+ line = f.readline()
+ assert line == ""
+
+ with patch('sshuttle.firewall.HOSTSFILE', new=str(new_hosts)):
+ sshuttle.firewall.restore_etc_hosts(10)
+ assert orig_hosts.computehash() == new_hosts.computehash()
+
+
+def test_subnet_weight():
+ subnets = [
+ (AF_INET, 16, 0, '192.168.0.0', 0, 0),
+ (AF_INET, 24, 0, '192.168.69.0', 0, 0),
+ (AF_INET, 32, 0, '192.168.69.70', 0, 0),
+ (AF_INET, 32, 1, '192.168.69.70', 0, 0),
+ (AF_INET, 32, 1, '192.168.69.70', 80, 80),
+ (AF_INET, 0, 1, '0.0.0.0', 0, 0),
+ (AF_INET, 0, 1, '0.0.0.0', 8000, 9000),
+ (AF_INET, 0, 1, '0.0.0.0', 8000, 8500),
+ (AF_INET, 0, 1, '0.0.0.0', 8000, 8000),
+ (AF_INET, 0, 1, '0.0.0.0', 400, 450)
+ ]
+ subnets_sorted = [
+ (AF_INET, 32, 1, '192.168.69.70', 80, 80),
+ (AF_INET, 0, 1, '0.0.0.0', 8000, 8000),
+ (AF_INET, 0, 1, '0.0.0.0', 400, 450),
+ (AF_INET, 0, 1, '0.0.0.0', 8000, 8500),
+ (AF_INET, 0, 1, '0.0.0.0', 8000, 9000),
+ (AF_INET, 32, 1, '192.168.69.70', 0, 0),
+ (AF_INET, 32, 0, '192.168.69.70', 0, 0),
+ (AF_INET, 24, 0, '192.168.69.0', 0, 0),
+ (AF_INET, 16, 0, '192.168.0.0', 0, 0),
+ (AF_INET, 0, 1, '0.0.0.0', 0, 0)
+ ]
+
+ assert subnets_sorted == \
+ sorted(subnets, key=sshuttle.firewall.subnet_weight, reverse=True)
+
+
+@patch('sshuttle.firewall.rewrite_etc_hosts')
+@patch('sshuttle.firewall.setup_daemon')
+@patch('sshuttle.firewall.get_method')
+def test_main(mock_get_method, mock_setup_daemon, mock_rewrite_etc_hosts):
+ stdin, stdout = setup_daemon()
+ mock_setup_daemon.return_value = stdin, stdout
+
+ mock_get_method("not_auto").name = "test"
+ mock_get_method.reset_mock()
+
+ sshuttle.firewall.main("not_auto", False)
+
+ assert mock_rewrite_etc_hosts.mock_calls == [
+ call({'1.2.3.3': 'existing'}, 1024),
+ call({}, 1024),
+ ]
+
+ assert stdout.mock_calls == [
+ call.write('READY test\n'),
+ call.flush(),
+ call.write('STARTED\n'),
+ call.flush()
+ ]
+ assert mock_setup_daemon.mock_calls == [call()]
+ assert mock_get_method.mock_calls == [
+ call('not_auto'),
+ call().setup_firewall(
+ 1024, 1026,
+ [(AF_INET6, u'2404:6800:4004:80c::33')],
+ AF_INET6,
+ [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 0, 0),
+ (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 80, 80)],
+ True,
+ None),
+ call().setup_firewall(
+ 1025, 1027,
+ [(AF_INET, u'1.2.3.33')],
+ AF_INET,
+ [(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
+ (AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
+ True,
+ None),
+ call().restore_firewall(1024, AF_INET6, True, None),
+ call().restore_firewall(1025, AF_INET, True, None),
+ ]
diff --git a/tests/client/test_helpers.py b/tests/client/test_helpers.py
new file mode 100644
index 0000000..7359ec2
--- /dev/null
+++ b/tests/client/test_helpers.py
@@ -0,0 +1,201 @@
+from mock import patch, call
+import sys
+import io
+import socket
+from socket import AF_INET, AF_INET6
+import errno
+
+import sshuttle.helpers
+
+
+@patch('sshuttle.helpers.logprefix', new='prefix: ')
+@patch('sshuttle.helpers.sys.stdout')
+@patch('sshuttle.helpers.sys.stderr')
+def test_log(mock_stderr, mock_stdout):
+ sshuttle.helpers.log("message")
+ sshuttle.helpers.log("abc")
+ sshuttle.helpers.log("message 1\n")
+ sshuttle.helpers.log("message 2\nline2\nline3\n")
+ sshuttle.helpers.log("message 3\nline2\nline3")
+ assert mock_stdout.mock_calls == [
+ call.flush(),
+ call.flush(),
+ call.flush(),
+ call.flush(),
+ call.flush(),
+ ]
+ assert mock_stderr.mock_calls == [
+ call.write('prefix: message'),
+ call.flush(),
+ call.write('prefix: abc'),
+ call.flush(),
+ call.write('prefix: message 1\n'),
+ call.flush(),
+ call.write('prefix: message 2\n'),
+ call.write('---> line2\n'),
+ call.write('---> line3\n'),
+ call.flush(),
+ call.write('prefix: message 3\n'),
+ call.write('---> line2\n'),
+ call.write('---> line3\n'),
+ call.flush(),
+ ]
+
+
+@patch('sshuttle.helpers.logprefix', new='prefix: ')
+@patch('sshuttle.helpers.verbose', new=1)
+@patch('sshuttle.helpers.sys.stdout')
+@patch('sshuttle.helpers.sys.stderr')
+def test_debug1(mock_stderr, mock_stdout):
+ sshuttle.helpers.debug1("message")
+ assert mock_stdout.mock_calls == [
+ call.flush(),
+ ]
+ assert mock_stderr.mock_calls == [
+ call.write('prefix: message'),
+ call.flush(),
+ ]
+
+
+@patch('sshuttle.helpers.logprefix', new='prefix: ')
+@patch('sshuttle.helpers.verbose', new=0)
+@patch('sshuttle.helpers.sys.stdout')
+@patch('sshuttle.helpers.sys.stderr')
+def test_debug1_nop(mock_stderr, mock_stdout):
+ sshuttle.helpers.debug1("message")
+ assert mock_stdout.mock_calls == []
+ assert mock_stderr.mock_calls == []
+
+
+@patch('sshuttle.helpers.logprefix', new='prefix: ')
+@patch('sshuttle.helpers.verbose', new=2)
+@patch('sshuttle.helpers.sys.stdout')
+@patch('sshuttle.helpers.sys.stderr')
+def test_debug2(mock_stderr, mock_stdout):
+ sshuttle.helpers.debug2("message")
+ assert mock_stdout.mock_calls == [
+ call.flush(),
+ ]
+ assert mock_stderr.mock_calls == [
+ call.write('prefix: message'),
+ call.flush(),
+ ]
+
+
+@patch('sshuttle.helpers.logprefix', new='prefix: ')
+@patch('sshuttle.helpers.verbose', new=1)
+@patch('sshuttle.helpers.sys.stdout')
+@patch('sshuttle.helpers.sys.stderr')
+def test_debug2_nop(mock_stderr, mock_stdout):
+ sshuttle.helpers.debug2("message")
+ assert mock_stdout.mock_calls == []
+ assert mock_stderr.mock_calls == []
+
+
+@patch('sshuttle.helpers.logprefix', new='prefix: ')
+@patch('sshuttle.helpers.verbose', new=3)
+@patch('sshuttle.helpers.sys.stdout')
+@patch('sshuttle.helpers.sys.stderr')
+def test_debug3(mock_stderr, mock_stdout):
+ sshuttle.helpers.debug3("message")
+ assert mock_stdout.mock_calls == [
+ call.flush(),
+ ]
+ assert mock_stderr.mock_calls == [
+ call.write('prefix: message'),
+ call.flush(),
+ ]
+
+
+@patch('sshuttle.helpers.logprefix', new='prefix: ')
+@patch('sshuttle.helpers.verbose', new=2)
+@patch('sshuttle.helpers.sys.stdout')
+@patch('sshuttle.helpers.sys.stderr')
+def test_debug3_nop(mock_stderr, mock_stdout):
+ sshuttle.helpers.debug3("message")
+ assert mock_stdout.mock_calls == []
+ assert mock_stderr.mock_calls == []
+
+
+@patch('sshuttle.helpers.open', create=True)
+def test_resolvconf_nameservers(mock_open):
+ mock_open.return_value = io.StringIO(u"""
+# Generated by NetworkManager
+search pri
+nameserver 192.168.1.1
+nameserver 192.168.2.1
+nameserver 192.168.3.1
+nameserver 192.168.4.1
+nameserver 2404:6800:4004:80c::1
+nameserver 2404:6800:4004:80c::2
+nameserver 2404:6800:4004:80c::3
+nameserver 2404:6800:4004:80c::4
+""")
+
+ ns = sshuttle.helpers.resolvconf_nameservers()
+ assert ns == [
+ (AF_INET, u'192.168.1.1'), (AF_INET, u'192.168.2.1'),
+ (AF_INET, u'192.168.3.1'), (AF_INET, u'192.168.4.1'),
+ (AF_INET6, u'2404:6800:4004:80c::1'),
+ (AF_INET6, u'2404:6800:4004:80c::2'),
+ (AF_INET6, u'2404:6800:4004:80c::3'),
+ (AF_INET6, u'2404:6800:4004:80c::4')
+ ]
+
+
+@patch('sshuttle.helpers.open', create=True)
+def test_resolvconf_random_nameserver(mock_open):
+ mock_open.return_value = io.StringIO(u"""
+# Generated by NetworkManager
+search pri
+nameserver 192.168.1.1
+nameserver 192.168.2.1
+nameserver 192.168.3.1
+nameserver 192.168.4.1
+nameserver 2404:6800:4004:80c::1
+nameserver 2404:6800:4004:80c::2
+nameserver 2404:6800:4004:80c::3
+nameserver 2404:6800:4004:80c::4
+""")
+ ns = sshuttle.helpers.resolvconf_random_nameserver()
+ assert ns in [
+ (AF_INET, u'192.168.1.1'), (AF_INET, u'192.168.2.1'),
+ (AF_INET, u'192.168.3.1'), (AF_INET, u'192.168.4.1'),
+ (AF_INET6, u'2404:6800:4004:80c::1'),
+ (AF_INET6, u'2404:6800:4004:80c::2'),
+ (AF_INET6, u'2404:6800:4004:80c::3'),
+ (AF_INET6, u'2404:6800:4004:80c::4')
+ ]
+
+
+@patch('sshuttle.helpers.socket.socket.bind')
+def test_islocal(mock_bind):
+ bind_error = socket.error(errno.EADDRNOTAVAIL)
+ mock_bind.side_effect = [None, bind_error, None, bind_error]
+
+ assert sshuttle.helpers.islocal("127.0.0.1", AF_INET)
+ assert not sshuttle.helpers.islocal("192.0.2.1", AF_INET)
+ assert sshuttle.helpers.islocal("::1", AF_INET6)
+ assert not sshuttle.helpers.islocal("2001:db8::1", AF_INET6)
+
+
+def test_family_ip_tuple():
+ assert sshuttle.helpers.family_ip_tuple("127.0.0.1") \
+ == (AF_INET, "127.0.0.1")
+ assert sshuttle.helpers.family_ip_tuple("192.168.2.6") \
+ == (AF_INET, "192.168.2.6")
+ assert sshuttle.helpers.family_ip_tuple("::1") \
+ == (AF_INET6, "::1")
+ assert sshuttle.helpers.family_ip_tuple("2404:6800:4004:80c::1") \
+ == (AF_INET6, "2404:6800:4004:80c::1")
+
+
+def test_family_to_string():
+ assert sshuttle.helpers.family_to_string(AF_INET) == "AF_INET"
+ assert sshuttle.helpers.family_to_string(AF_INET6) == "AF_INET6"
+ if sys.version_info < (3, 0):
+ expected = "1"
+ assert sshuttle.helpers.family_to_string(socket.AF_UNIX) == "1"
+ else:
+ expected = 'AddressFamily.AF_UNIX'
+ assert sshuttle.helpers.family_to_string(socket.AF_UNIX) == expected
diff --git a/tests/client/test_methods_nat.py b/tests/client/test_methods_nat.py
new file mode 100644
index 0000000..b18c05e
--- /dev/null
+++ b/tests/client/test_methods_nat.py
@@ -0,0 +1,162 @@
+import pytest
+from mock import Mock, patch, call
+import socket
+from socket import AF_INET, AF_INET6
+import struct
+
+from sshuttle.helpers import Fatal
+from sshuttle.methods import get_method
+
+
+def test_get_supported_features():
+ method = get_method('nat')
+ features = method.get_supported_features()
+ assert not features.ipv6
+ assert not features.udp
+ assert features.dns
+
+
+def test_get_tcp_dstip():
+ sock = Mock()
+ sock.getsockopt.return_value = struct.pack(
+ '!HHBBBB', socket.ntohs(AF_INET), 1024, 127, 0, 0, 1)
+ method = get_method('nat')
+ assert method.get_tcp_dstip(sock) == ('127.0.0.1', 1024)
+ assert sock.mock_calls == [call.getsockopt(0, 80, 16)]
+
+
+def test_recv_udp():
+ sock = Mock()
+ sock.recvfrom.return_value = "11111", "127.0.0.1"
+ method = get_method('nat')
+ result = method.recv_udp(sock, 1024)
+ assert sock.mock_calls == [call.recvfrom(1024)]
+ assert result == ("127.0.0.1", None, "11111")
+
+
+def test_send_udp():
+ sock = Mock()
+ method = get_method('nat')
+ method.send_udp(sock, None, "127.0.0.1", "22222")
+ assert sock.mock_calls == [call.sendto("22222", "127.0.0.1")]
+
+
+def test_setup_tcp_listener():
+ listener = Mock()
+ method = get_method('nat')
+ method.setup_tcp_listener(listener)
+ assert listener.mock_calls == []
+
+
+def test_setup_udp_listener():
+ listener = Mock()
+ method = get_method('nat')
+ method.setup_udp_listener(listener)
+ assert listener.mock_calls == []
+
+
+def test_assert_features():
+ method = get_method('nat')
+ features = method.get_supported_features()
+ method.assert_features(features)
+
+ features.udp = True
+ with pytest.raises(Fatal):
+ method.assert_features(features)
+
+ features.ipv6 = True
+ with pytest.raises(Fatal):
+ method.assert_features(features)
+
+
+def test_firewall_command():
+ method = get_method('nat')
+ assert not method.firewall_command("somthing")
+
+
+@patch('sshuttle.methods.nat.ipt')
+@patch('sshuttle.methods.nat.ipt_ttl')
+@patch('sshuttle.methods.nat.ipt_chain_exists')
+def test_setup_firewall(mock_ipt_chain_exists, mock_ipt_ttl, mock_ipt):
+ mock_ipt_chain_exists.return_value = True
+ method = get_method('nat')
+ assert method.name == 'nat'
+
+ with pytest.raises(Exception) as excinfo:
+ method.setup_firewall(
+ 1024, 1026,
+ [(AF_INET6, u'2404:6800:4004:80c::33')],
+ AF_INET6,
+ [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 0, 0),
+ (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 80, 80)],
+ True,
+ None)
+ assert str(excinfo.value) \
+ == 'Address family "AF_INET6" unsupported by nat method_name'
+ assert mock_ipt_chain_exists.mock_calls == []
+ assert mock_ipt_ttl.mock_calls == []
+ assert mock_ipt.mock_calls == []
+
+ with pytest.raises(Exception) as excinfo:
+ method.setup_firewall(
+ 1025, 1027,
+ [(AF_INET, u'1.2.3.33')],
+ AF_INET,
+ [(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
+ (AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
+ True,
+ None)
+ assert str(excinfo.value) == 'UDP not supported by nat method_name'
+ assert mock_ipt_chain_exists.mock_calls == []
+ assert mock_ipt_ttl.mock_calls == []
+ assert mock_ipt.mock_calls == []
+
+ method.setup_firewall(
+ 1025, 1027,
+ [(AF_INET, u'1.2.3.33')],
+ AF_INET,
+ [(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
+ (AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
+ False,
+ None)
+ assert mock_ipt_chain_exists.mock_calls == [
+ call(AF_INET, 'nat', 'sshuttle-1025')
+ ]
+ assert mock_ipt_ttl.mock_calls == [
+ call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'REDIRECT',
+ '--dest', u'1.2.3.0/24', '-p', 'tcp', '--dport', '8000:9000',
+ '--to-ports', '1025'),
+ call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'REDIRECT',
+ '--dest', u'1.2.3.33/32', '-p', 'udp',
+ '--dport', '53', '--to-ports', '1027')
+ ]
+ assert mock_ipt.mock_calls == [
+ call(AF_INET, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-D', 'PREROUTING', '-j', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-X', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-N', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-I', 'OUTPUT', '1', '-j', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-I', 'PREROUTING', '1', '-j', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'RETURN',
+ '--dest', u'1.2.3.66/32', '-p', 'tcp', '--dport', '8080:8080')
+ ]
+ mock_ipt_chain_exists.reset_mock()
+ mock_ipt_ttl.reset_mock()
+ mock_ipt.reset_mock()
+
+ method.restore_firewall(1025, AF_INET, False, None)
+ assert mock_ipt_chain_exists.mock_calls == [
+ call(AF_INET, 'nat', 'sshuttle-1025')
+ ]
+ assert mock_ipt_ttl.mock_calls == []
+ assert mock_ipt.mock_calls == [
+ call(AF_INET, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-D', 'PREROUTING', '-j', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
+ call(AF_INET, 'nat', '-X', 'sshuttle-1025')
+ ]
+ mock_ipt_chain_exists.reset_mock()
+ mock_ipt_ttl.reset_mock()
+ mock_ipt.reset_mock()
diff --git a/tests/client/test_methods_pf.py b/tests/client/test_methods_pf.py
new file mode 100644
index 0000000..e965c62
--- /dev/null
+++ b/tests/client/test_methods_pf.py
@@ -0,0 +1,487 @@
+import pytest
+from mock import Mock, patch, call, ANY
+import socket
+from socket import AF_INET, AF_INET6
+
+from sshuttle.methods import get_method
+from sshuttle.helpers import Fatal
+from sshuttle.methods.pf import FreeBsd, Darwin, OpenBsd
+
+
+def test_get_supported_features():
+ method = get_method('pf')
+ features = method.get_supported_features()
+ assert features.ipv6
+ assert not features.udp
+ assert features.dns
+
+
+@patch('sshuttle.helpers.verbose', new=3)
+def test_get_tcp_dstip():
+ sock = Mock()
+ sock.getpeername.return_value = ("127.0.0.1", 1024)
+ sock.getsockname.return_value = ("127.0.0.2", 1025)
+ sock.family = AF_INET
+
+ firewall = Mock()
+ firewall.pfile.readline.return_value = \
+ b"QUERY_PF_NAT_SUCCESS 127.0.0.3,1026\n"
+
+ method = get_method('pf')
+ method.set_firewall(firewall)
+ assert method.get_tcp_dstip(sock) == ('127.0.0.3', 1026)
+
+ assert sock.mock_calls == [
+ call.getpeername(),
+ call.getsockname(),
+ ]
+ assert firewall.mock_calls == [
+ call.pfile.write(b'QUERY_PF_NAT 2,6,127.0.0.1,1024,127.0.0.2,1025\n'),
+ call.pfile.flush(),
+ call.pfile.readline()
+ ]
+
+
+def test_recv_udp():
+ sock = Mock()
+ sock.recvfrom.return_value = "11111", "127.0.0.1"
+ method = get_method('pf')
+ result = method.recv_udp(sock, 1024)
+ assert sock.mock_calls == [call.recvfrom(1024)]
+ assert result == ("127.0.0.1", None, "11111")
+
+
+def test_send_udp():
+ sock = Mock()
+ method = get_method('pf')
+ method.send_udp(sock, None, "127.0.0.1", "22222")
+ assert sock.mock_calls == [call.sendto("22222", "127.0.0.1")]
+
+
+def test_setup_tcp_listener():
+ listener = Mock()
+ method = get_method('pf')
+ method.setup_tcp_listener(listener)
+ assert listener.mock_calls == []
+
+
+def test_setup_udp_listener():
+ listener = Mock()
+ method = get_method('pf')
+ method.setup_udp_listener(listener)
+ assert listener.mock_calls == []
+
+
+def test_assert_features():
+ method = get_method('pf')
+ features = method.get_supported_features()
+ method.assert_features(features)
+
+ features.udp = True
+ with pytest.raises(Fatal):
+ method.assert_features(features)
+
+ features.ipv6 = True
+ with pytest.raises(Fatal):
+ method.assert_features(features)
+
+
+@patch('sshuttle.methods.pf.pf', Darwin())
+@patch('sshuttle.methods.pf.sys.stdout')
+@patch('sshuttle.methods.pf.ioctl')
+@patch('sshuttle.methods.pf.pf_get_dev')
+def test_firewall_command_darwin(mock_pf_get_dev, mock_ioctl, mock_stdout):
+ method = get_method('pf')
+ assert not method.firewall_command("somthing")
+
+ command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % (
+ AF_INET, socket.IPPROTO_TCP,
+ "127.0.0.1", 1025, "127.0.0.2", 1024)
+ assert method.firewall_command(command)
+
+ assert mock_pf_get_dev.mock_calls == [call()]
+ assert mock_ioctl.mock_calls == [
+ call(mock_pf_get_dev(), 0xc0544417, ANY),
+ ]
+ assert mock_stdout.mock_calls == [
+ call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'),
+ call.flush(),
+ ]
+
+
+@patch('sshuttle.methods.pf.pf', FreeBsd())
+@patch('sshuttle.methods.pf.sys.stdout')
+@patch('sshuttle.methods.pf.ioctl')
+@patch('sshuttle.methods.pf.pf_get_dev')
+def test_firewall_command_freebsd(mock_pf_get_dev, mock_ioctl, mock_stdout):
+ method = get_method('pf')
+ assert not method.firewall_command("somthing")
+
+ command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % (
+ AF_INET, socket.IPPROTO_TCP,
+ "127.0.0.1", 1025, "127.0.0.2", 1024)
+ assert method.firewall_command(command)
+
+ assert mock_pf_get_dev.mock_calls == [call()]
+ assert mock_ioctl.mock_calls == [
+ call(mock_pf_get_dev(), 0xc04c4417, ANY),
+ ]
+ assert mock_stdout.mock_calls == [
+ call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'),
+ call.flush(),
+ ]
+
+
+@patch('sshuttle.methods.pf.pf', OpenBsd())
+@patch('sshuttle.methods.pf.sys.stdout')
+@patch('sshuttle.methods.pf.ioctl')
+@patch('sshuttle.methods.pf.pf_get_dev')
+def test_firewall_command_openbsd(mock_pf_get_dev, mock_ioctl, mock_stdout):
+ method = get_method('pf')
+ assert not method.firewall_command("somthing")
+
+ command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % (
+ AF_INET, socket.IPPROTO_TCP,
+ "127.0.0.1", 1025, "127.0.0.2", 1024)
+ assert method.firewall_command(command)
+
+ assert mock_pf_get_dev.mock_calls == [call()]
+ assert mock_ioctl.mock_calls == [
+ call(mock_pf_get_dev(), 0xc0504417, ANY),
+ ]
+ assert mock_stdout.mock_calls == [
+ call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'),
+ call.flush(),
+ ]
+
+
+def pfctl(args, stdin=None):
+ if args == '-s Interfaces -i lo -v':
+ return (b'lo0 (skip)',)
+ if args == '-s all':
+ return (b'INFO:\nStatus: Disabled\nanother mary had a little lamb\n',
+ b'little lamb\n')
+ if args == '-E':
+ return (b'\n', b'Token : abcdefg\n')
+ return None
+
+
+@patch('sshuttle.helpers.verbose', new=3)
+@patch('sshuttle.methods.pf.pf', Darwin())
+@patch('sshuttle.methods.pf.pfctl')
+@patch('sshuttle.methods.pf.ioctl')
+@patch('sshuttle.methods.pf.pf_get_dev')
+def test_setup_firewall_darwin(mock_pf_get_dev, mock_ioctl, mock_pfctl):
+ mock_pfctl.side_effect = pfctl
+
+ method = get_method('pf')
+ assert method.name == 'pf'
+
+ # IPV6
+
+ method.setup_firewall(
+ 1024, 1026,
+ [(AF_INET6, u'2404:6800:4004:80c::33')],
+ AF_INET6,
+ [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000),
+ (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)],
+ False,
+ None)
+ assert mock_ioctl.mock_calls == [
+ call(mock_pf_get_dev(), 0xC4704433, ANY),
+ call(mock_pf_get_dev(), 0xCC20441A, ANY),
+ call(mock_pf_get_dev(), 0xCC20441A, ANY),
+ call(mock_pf_get_dev(), 0xC4704433, ANY),
+ call(mock_pf_get_dev(), 0xCC20441A, ANY),
+ call(mock_pf_get_dev(), 0xCC20441A, ANY),
+ ]
+ assert mock_pfctl.mock_calls == [
+ call('-s Interfaces -i lo -v'),
+ call('-f /dev/stdin', b'pass on lo\n'),
+ call('-s all'),
+ call('-a sshuttle6-1024 -f /dev/stdin',
+ b'table <dns_servers> {2404:6800:4004:80c::33}\n'
+ b'rdr pass on lo0 inet6 proto tcp from ! ::1 to '
+ b'2404:6800:4004:80c::/64 port 8000:9000 -> ::1 port 1024\n'
+ b'rdr pass on lo0 inet6 proto udp '
+ b'to <dns_servers> port 53 -> ::1 port 1026\n'
+ b'pass out quick inet6 proto tcp to '
+ b'2404:6800:4004:80c::101f/128 port 8080:8080\n'
+ b'pass out route-to lo0 inet6 proto tcp to '
+ b'2404:6800:4004:80c::/64 port 8000:9000 keep state\n'
+ b'pass out route-to lo0 inet6 proto udp '
+ b'to <dns_servers> port 53 keep state\n'),
+ call('-E'),
+ ]
+ mock_pf_get_dev.reset_mock()
+ mock_ioctl.reset_mock()
+ mock_pfctl.reset_mock()
+
+ with pytest.raises(Exception) as excinfo:
+ method.setup_firewall(
+ 1025, 1027,
+ [(AF_INET, u'1.2.3.33')],
+ AF_INET,
+ [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
+ (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
+ True,
+ None)
+ assert str(excinfo.value) == 'UDP not supported by pf method_name'
+ assert mock_pf_get_dev.mock_calls == []
+ assert mock_ioctl.mock_calls == []
+ assert mock_pfctl.mock_calls == []
+
+ method.setup_firewall(
+ 1025, 1027,
+ [(AF_INET, u'1.2.3.33')],
+ AF_INET,
+ [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
+ (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
+ False,
+ None)
+ assert mock_ioctl.mock_calls == [
+ call(mock_pf_get_dev(), 0xC4704433, ANY),
+ call(mock_pf_get_dev(), 0xCC20441A, ANY),
+ call(mock_pf_get_dev(), 0xCC20441A, ANY),
+ call(mock_pf_get_dev(), 0xC4704433, ANY),
+ call(mock_pf_get_dev(), 0xCC20441A, ANY),
+ call(mock_pf_get_dev(), 0xCC20441A, ANY),
+ ]
+ assert mock_pfctl.mock_calls == [
+ call('-s Interfaces -i lo -v'),
+ call('-f /dev/stdin', b'pass on lo\n'),
+ call('-s all'),
+ call('-a sshuttle-1025 -f /dev/stdin',
+ b'table <dns_servers> {1.2.3.33}\n'
+ b'rdr pass on lo0 inet proto tcp from ! 127.0.0.1 to 1.2.3.0/24 '
+ b'-> 127.0.0.1 port 1025\n'
+ b'rdr pass on lo0 inet proto udp '
+ b'to <dns_servers> port 53 -> 127.0.0.1 port 1027\n'
+ b'pass out quick inet proto tcp to 1.2.3.66/32 port 80:80\n'
+ b'pass out route-to lo0 inet proto tcp to 1.2.3.0/24 keep state\n'
+ b'pass out route-to lo0 inet proto udp '
+ b'to <dns_servers> port 53 keep state\n'),
+ call('-E'),
+ ]
+ mock_pf_get_dev.reset_mock()
+ mock_ioctl.reset_mock()
+ mock_pfctl.reset_mock()
+
+ method.restore_firewall(1025, AF_INET, False, None)
+ assert mock_ioctl.mock_calls == []
+ assert mock_pfctl.mock_calls == [
+ call('-a sshuttle-1025 -F all'),
+ call("-X abcdefg"),
+ ]
+ mock_pf_get_dev.reset_mock()
+ mock_pfctl.reset_mock()
+ mock_ioctl.reset_mock()
+
+
+@patch('sshuttle.helpers.verbose', new=3)
+@patch('sshuttle.methods.pf.pf', FreeBsd())
+@patch('subprocess.call')
+@patch('sshuttle.methods.pf.pfctl')
+@patch('sshuttle.methods.pf.ioctl')
+@patch('sshuttle.methods.pf.pf_get_dev')
+def test_setup_firewall_freebsd(mock_pf_get_dev, mock_ioctl, mock_pfctl,
+ mock_subprocess_call):
+ mock_pfctl.side_effect = pfctl
+
+ method = get_method('pf')
+ assert method.name == 'pf'
+
+ method.setup_firewall(
+ 1024, 1026,
+ [(AF_INET6, u'2404:6800:4004:80c::33')],
+ AF_INET6,
+ [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000),
+ (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)],
+ False,
+ None)
+
+ assert mock_pfctl.mock_calls == [
+ call('-s all'),
+ call('-a sshuttle6-1024 -f /dev/stdin',
+ b'table <dns_servers> {2404:6800:4004:80c::33}\n'
+ b'rdr pass on lo0 inet6 proto tcp from ! ::1 to '
+ b'2404:6800:4004:80c::/64 port 8000:9000 -> ::1 port 1024\n'
+ b'rdr pass on lo0 inet6 proto udp '
+ b'to <dns_servers> port 53 -> ::1 port 1026\n'
+ b'pass out quick inet6 proto tcp to '
+ b'2404:6800:4004:80c::101f/128 port 8080:8080\n'
+ b'pass out route-to lo0 inet6 proto tcp to '
+ b'2404:6800:4004:80c::/64 port 8000:9000 keep state\n'
+ b'pass out route-to lo0 inet6 proto udp '
+ b'to <dns_servers> port 53 keep state\n'),
+ call('-e'),
+ ]
+ assert call(['kldload', 'pf']) in mock_subprocess_call.mock_calls
+ mock_pf_get_dev.reset_mock()
+ mock_ioctl.reset_mock()
+ mock_pfctl.reset_mock()
+
+ with pytest.raises(Exception) as excinfo:
+ method.setup_firewall(
+ 1025, 1027,
+ [(AF_INET, u'1.2.3.33')],
+ AF_INET,
+ [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
+ (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
+ True,
+ None)
+ assert str(excinfo.value) == 'UDP not supported by pf method_name'
+ assert mock_pf_get_dev.mock_calls == []
+ assert mock_ioctl.mock_calls == []
+ assert mock_pfctl.mock_calls == []
+
+ method.setup_firewall(
+ 1025, 1027,
+ [(AF_INET, u'1.2.3.33')],
+ AF_INET,
+ [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
+ (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
+ False,
+ None)
+ assert mock_ioctl.mock_calls == [
+ call(mock_pf_get_dev(), 0xC4704433, ANY),
+ call(mock_pf_get_dev(), 0xCBE0441A, ANY),
+ call(mock_pf_get_dev(), 0xCBE0441A, ANY),
+ call(mock_pf_get_dev(), 0xC4704433, ANY),
+ call(mock_pf_get_dev(), 0xCBE0441A, ANY),
+ call(mock_pf_get_dev(), 0xCBE0441A, ANY),
+ ]
+ assert mock_pfctl.mock_calls == [
+ call('-s all'),
+ call('-a sshuttle-1025 -f /dev/stdin',
+ b'table <dns_servers> {1.2.3.33}\n'
+ b'rdr pass on lo0 inet proto tcp from ! 127.0.0.1 '
+ b'to 1.2.3.0/24 -> 127.0.0.1 port 1025\n'
+ b'rdr pass on lo0 inet proto udp '
+ b'to <dns_servers> port 53 -> 127.0.0.1 port 1027\n'
+ b'pass out quick inet proto tcp to 1.2.3.66/32 port 80:80\n'
+ b'pass out route-to lo0 inet proto tcp to 1.2.3.0/24 keep state\n'
+ b'pass out route-to lo0 inet proto udp '
+ b'to <dns_servers> port 53 keep state\n'),
+ call('-e'),
+ ]
+ mock_pf_get_dev.reset_mock()
+ mock_ioctl.reset_mock()
+ mock_pfctl.reset_mock()
+
+ method.restore_firewall(1025, AF_INET, False, None)
+ method.restore_firewall(1024, AF_INET6, False, None)
+ assert mock_ioctl.mock_calls == []
+ assert mock_pfctl.mock_calls == [
+ call('-a sshuttle-1025 -F all'),
+ call('-a sshuttle6-1024 -F all'),
+ call("-d"),
+ ]
+ mock_pf_get_dev.reset_mock()
+ mock_pfctl.reset_mock()
+ mock_ioctl.reset_mock()
+
+
+@patch('sshuttle.helpers.verbose', new=3)
+@patch('sshuttle.methods.pf.pf', OpenBsd())
+@patch('sshuttle.methods.pf.pfctl')
+@patch('sshuttle.methods.pf.ioctl')
+@patch('sshuttle.methods.pf.pf_get_dev')
+def test_setup_firewall_openbsd(mock_pf_get_dev, mock_ioctl, mock_pfctl):
+ mock_pfctl.side_effect = pfctl
+
+ method = get_method('pf')
+ assert method.name == 'pf'
+
+ method.setup_firewall(
+ 1024, 1026,
+ [(AF_INET6, u'2404:6800:4004:80c::33')],
+ AF_INET6,
+ [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000),
+ (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)],
+ False,
+ None)
+
+ assert mock_ioctl.mock_calls == [
+ call(mock_pf_get_dev(), 0xcd48441a, ANY),
+ call(mock_pf_get_dev(), 0xcd48441a, ANY),
+ ]
+ assert mock_pfctl.mock_calls == [
+ call('-s Interfaces -i lo -v'),
+ call('-f /dev/stdin', b'match on lo\n'),
+ call('-s all'),
+ call('-a sshuttle6-1024 -f /dev/stdin',
+ b'table <dns_servers> {2404:6800:4004:80c::33}\n'
+ b'pass in on lo0 inet6 proto tcp to 2404:6800:4004:80c::/64 '
+ b'port 8000:9000 divert-to ::1 port 1024\n'
+ b'pass in on lo0 inet6 proto udp '
+ b'to <dns_servers> port 53 rdr-to ::1 port 1026\n'
+ b'pass out quick inet6 proto tcp to '
+ b'2404:6800:4004:80c::101f/128 port 8080:8080\n'
+ b'pass out inet6 proto tcp to 2404:6800:4004:80c::/64 '
+ b'port 8000:9000 route-to lo0 keep state\n'
+ b'pass out inet6 proto udp to '
+ b'<dns_servers> port 53 route-to lo0 keep state\n'),
+ call('-e'),
+ ]
+ mock_pf_get_dev.reset_mock()
+ mock_ioctl.reset_mock()
+ mock_pfctl.reset_mock()
+
+ with pytest.raises(Exception) as excinfo:
+ method.setup_firewall(
+ 1025, 1027,
+ [(AF_INET, u'1.2.3.33')],
+ AF_INET,
+ [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
+ (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
+ True,
+ None)
+ assert str(excinfo.value) == 'UDP not supported by pf method_name'
+ assert mock_pf_get_dev.mock_calls == []
+ assert mock_ioctl.mock_calls == []
+ assert mock_pfctl.mock_calls == []
+
+ method.setup_firewall(
+ 1025, 1027,
+ [(AF_INET, u'1.2.3.33')],
+ AF_INET,
+ [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
+ (AF_IN