#! /somewhere/python3
from contextlib import contextmanager, _GeneratorContextManager
from queue import Queue, Empty
from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List, Iterable
from xml.sax.saxutils import XMLGenerator
from colorama import Style
import queue
import io
import _thread
import argparse
import atexit
import base64
import codecs
import os
import pathlib
import ptpython.repl
import pty
import re
import shlex
import shutil
import socket
import subprocess
import sys
import telnetlib
import tempfile
import time
import unicodedata
CHAR_TO_KEY = {
"A": "shift-a",
"N": "shift-n",
"-": "0x0C",
"_": "shift-0x0C",
"B": "shift-b",
"O": "shift-o",
"=": "0x0D",
"+": "shift-0x0D",
"C": "shift-c",
"P": "shift-p",
"[": "0x1A",
"{": "shift-0x1A",
"D": "shift-d",
"Q": "shift-q",
"]": "0x1B",
"}": "shift-0x1B",
"E": "shift-e",
"R": "shift-r",
";": "0x27",
":": "shift-0x27",
"F": "shift-f",
"S": "shift-s",
"'": "0x28",
'"': "shift-0x28",
"G": "shift-g",
"T": "shift-t",
"`": "0x29",
"~": "shift-0x29",
"H": "shift-h",
"U": "shift-u",
"\\": "0x2B",
"|": "shift-0x2B",
"I": "shift-i",
"V": "shift-v",
",": "0x33",
"<": "shift-0x33",
"J": "shift-j",
"W": "shift-w",
".": "0x34",
">": "shift-0x34",
"K": "shift-k",
"X": "shift-x",
"/": "0x35",
"?": "shift-0x35",
"L": "shift-l",
"Y": "shift-y",
" ": "spc",
"M": "shift-m",
"Z": "shift-z",
"\n": "ret",
"!": "shift-0x02",
"@": "shift-0x03",
"#": "shift-0x04",
"$": "shift-0x05",
"%": "shift-0x06",
"^": "shift-0x07",
"&": "shift-0x08",
"*": "shift-0x09",
"(": "shift-0x0A",
")": "shift-0x0B",
}
global log, machines, test_script
def eprint(*args: object, **kwargs: Any) -> None:
print(*args, file=sys.stderr, **kwargs)
def make_command(args: list) -> str:
return " ".join(map(shlex.quote, (map(str, args))))
def create_vlan(vlan_nr: str) -> Tuple[str, str, "subprocess.Popen[bytes]", Any]:
log.log("starting VDE switch for network {}".format(vlan_nr))
vde_socket = tempfile.mkdtemp(
prefix="nixos-test-vde-", suffix="-vde{}.ctl".format(vlan_nr)
)
pty_master, pty_slave = pty.openpty()
vde_process = subprocess.Popen(
["vde_switch", "-s", vde_socket, "--dirmode", "0700"],
stdin=pty_slave,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
)
fd = os.fdopen(pty_master, "w")
fd.write("version\n")
# TODO: perl version checks if this can be read from
# an if not, dies. we could hang here forever. Fix it.
assert vde_process.stdout is not None
vde_process.stdout.readline()
if not os.path.exists