diff options
author | Andrey Kislyuk <kislyuk@gmail.com> | 2019-06-16 21:54:14 -0700 |
---|---|---|
committer | Andrey Kislyuk <kislyuk@gmail.com> | 2019-06-16 21:54:16 -0700 |
commit | 124235bd59dde0ef3baf63141a26fc77ec5a7ceb (patch) | |
tree | 1e37d671ce8971b78b5b6c870f41daac8c07435e | |
parent | ee307fcfcdac7a8d74ac454d93675d401ab11168 (diff) |
Make main body of yq callable as a library function
Fixes #60
-rwxr-xr-x | setup.py | 2 | ||||
-rwxr-xr-x | test/test.py | 6 | ||||
-rwxr-xr-x | yq/__init__.py | 81 |
3 files changed, 52 insertions, 37 deletions
@@ -27,7 +27,7 @@ setup( include_package_data=True, entry_points={ 'console_scripts': [ - 'yq=yq:main', + 'yq=yq:cli', 'xq=yq:xq_cli' ], }, diff --git a/test/test.py b/test/test.py index 2da6150..dfb41ec 100755 --- a/test/test.py +++ b/test/test.py @@ -6,7 +6,7 @@ from __future__ import absolute_import, division, print_function, unicode_litera import os, sys, unittest, tempfile, json, io, platform, subprocess sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) -from yq import main # noqa +from yq import yq, cli # noqa USING_PYTHON2 = True if sys.version_info < (3, 0) else False USING_PYPY = True if platform.python_implementation() == "PyPy" else False @@ -38,7 +38,7 @@ class TestYq(unittest.TestCase): try: sys.stdin = io.StringIO(input_data) sys.stdout = io.BytesIO() if USING_PYTHON2 else io.StringIO() - main(args, input_format=input_format) + cli(args, input_format=input_format) except SystemExit as e: self.assertIn(e.code, expect_exit_codes) finally: @@ -51,7 +51,7 @@ class TestYq(unittest.TestCase): def test_yq(self): for input_format in "yaml", "xml", "toml": try: - main(["--help"], input_format=input_format) + cli(["--help"], input_format=input_format) except SystemExit as e: self.assertEqual(e.code, 0) self.assertEqual(self.run_yq("{}", ["."]), "") diff --git a/yq/__init__.py b/yq/__init__.py index df33dbc..b18abc3 100755 --- a/yq/__init__.py +++ b/yq/__init__.py @@ -94,64 +94,78 @@ def get_parser(program_name): if sys.version_info >= (3, 5): parser_args.update(allow_abbrev=False) # required to disambiguate options listed in jq_arg_spec parser = Parser(**parser_args) - parser.add_argument("--yaml-output", "--yml-output", "-y", action="store_true", help=yaml_output_help) + parser.add_argument("--output-format", default="json", help=argparse.SUPPRESS) + parser.add_argument("--yaml-output", "--yml-output", "-y", dest="output_format", action="store_const", const="yaml", + help=yaml_output_help) parser.add_argument("--width", "-w", type=int, help=width_help) - parser.add_argument("--xml-output", "-x", action="store_true", help=xml_output_help) + parser.add_argument("--xml-output", "-x", dest="output_format", action="store_const", const="xml", + help=xml_output_help) parser.add_argument("--xml-dtd", action="store_true", help=xml_dtd_help) parser.add_argument("--xml-root", help=xml_root_help) - parser.add_argument("--toml-output", "-t", action="store_true", help=toml_output_help) + parser.add_argument("--toml-output", "-t", dest="output_format", action="store_const", const="toml", + help=toml_output_help) parser.add_argument("--version", action="version", version="%(prog)s {version}".format(version=__version__)) for arg in jq_arg_spec: parser.add_argument(arg, nargs=jq_arg_spec[arg], dest=arg, action="append", help=argparse.SUPPRESS) parser.add_argument("jq_filter") - parser.add_argument("files", nargs="*", type=argparse.FileType()) + parser.add_argument("input_streams", nargs="*", type=argparse.FileType(), metavar="files", default=[sys.stdin]) return parser def xq_cli(): - main(input_format="xml", program_name="xq") + cli(input_format="xml", program_name="xq") def tq_cli(): - main(input_format="toml", program_name="tq") + cli(input_format="toml", program_name="tq") -def main(args=None, input_format="yaml", program_name="yq"): +def cli(args=None, input_format="yaml", program_name="yq"): parser = get_parser(program_name) args, jq_args = parser.parse_known_args(args=args) for arg in jq_arg_spec: values = getattr(args, arg, None) + delattr(args, arg) if values is not None: for value_group in values: jq_args.append(arg) jq_args.extend(value_group) - if getattr(args, "--from-file") or getattr(args, "-f"): - args.files.insert(0, argparse.FileType()(args.jq_filter)) + if "--from-file" in jq_args or "-f" in jq_args: + args.input_streams.insert(0, argparse.FileType()(args.jq_filter)) else: jq_filter_arg_loc = len(jq_args) - if getattr(args, "--args"): + if "--args" in jq_args: jq_filter_arg_loc = jq_args.index('--args') + 1 - elif getattr(args, "--jsonargs"): + elif "--jsonargs" in jq_args: jq_filter_arg_loc = jq_args.index('--jsonargs') + 1 jq_args.insert(jq_filter_arg_loc, args.jq_filter) + delattr(args, "jq_filter") - if sys.stdin.isatty() and not args.files: + if sys.stdin.isatty() and not args.input_streams: return parser.print_help() - converting_output = args.yaml_output or args.xml_output or args.toml_output + yq(input_format=input_format, program_name=program_name, jq_args=jq_args, **vars(args)) + +def yq(input_streams=None, output_stream=None, input_format="yaml", output_format="json", + program_name="yq", width=None, xml_root=None, xml_dtd=False, jq_args=frozenset(), exit_func=None): + if not input_streams: + input_streams = [sys.stdin] + if not output_stream: + output_stream = sys.stdout + if not exit_func: + exit_func = sys.exit + converting_output = True if output_format != "json" else False try: # Note: universal_newlines is just a way to induce subprocess to make stdin a text buffer and encode it for us - jq = subprocess.Popen(["jq"] + jq_args, + jq = subprocess.Popen(["jq"] + list(jq_args), stdin=subprocess.PIPE, stdout=subprocess.PIPE if converting_output else None, universal_newlines=True) except OSError as e: msg = "{}: Error starting jq: {}: {}. Is jq installed and available on PATH?" - parser.exit(msg.format(program_name, type(e).__name__, e)) + exit_func(msg.format(program_name, type(e).__name__, e)) try: - input_streams = args.files if args.files else [sys.stdin] - if converting_output: # TODO: enable true streaming in this branch (with asyncio, asyncproc, a multi-shot variant of # subprocess.Popen._communicate, etc.) @@ -171,41 +185,42 @@ def main(args=None, input_format="yaml", program_name="yq"): input_payload = "\n".join(json.dumps(doc, cls=JSONDateTimeEncoder) for doc in input_docs) jq_out, jq_err = jq.communicate(input_payload) json_decoder = json.JSONDecoder(object_pairs_hook=OrderedDict) - if args.yaml_output: - yaml.dump_all(decode_docs(jq_out, json_decoder), stream=sys.stdout, Dumper=OrderedDumper, - width=args.width, allow_unicode=True, default_flow_style=False) - elif args.xml_output: + if output_format == "yaml": + yaml.dump_all(decode_docs(jq_out, json_decoder), stream=output_stream, Dumper=OrderedDumper, + width=width, allow_unicode=True, default_flow_style=False) + elif output_format == "xml": import xmltodict for doc in decode_docs(jq_out, json_decoder): - if args.xml_root: - doc = {args.xml_root: doc} + if xml_root: + doc = {xml_root: doc} elif not isinstance(doc, OrderedDict): msg = ("{}: Error converting JSON to XML: cannot represent non-object types at top level. " "Use --xml-root=name to envelope your output with a root element.") - parser.exit(msg.format(program_name)) - full_document = True if args.xml_dtd else False + exit_func(msg.format(program_name)) + full_document = True if xml_dtd else False try: - xmltodict.unparse(doc, output=sys.stdout, full_document=full_document, pretty=True, indent=" ") + xmltodict.unparse(doc, output=output_stream, full_document=full_document, pretty=True, + indent=" ") except ValueError as e: if "Document must have exactly one root" in str(e): raise Exception(str(e) + " Use --xml-root=name to envelope your output with a root element") else: raise - sys.stdout.write(b"\n" if sys.version_info < (3, 0) else "\n") - elif args.toml_output: + output_stream.write(b"\n" if sys.version_info < (3, 0) else "\n") + elif output_format == "toml": import toml for doc in decode_docs(jq_out, json_decoder): if not isinstance(doc, OrderedDict): msg = "{}: Error converting JSON to TOML: cannot represent non-object types at top level." - parser.exit(msg.format(program_name)) + exit_func(msg.format(program_name)) if USING_PYTHON2: # For Python 2, dump the string and encode it into bytes. output = toml.dumps(doc) - sys.stdout.write(output.encode("utf-8")) + output_stream.write(output.encode("utf-8")) else: # For Python 3, write the unicode to the buffer directly. - toml.dump(doc, sys.stdout) + toml.dump(doc, output_stream) else: if input_format == "yaml": for input_stream in input_streams: @@ -229,6 +244,6 @@ def main(args=None, input_format="yaml", program_name="yq"): jq.wait() for input_stream in input_streams: input_stream.close() - exit(jq.returncode) + exit_func(jq.returncode) except Exception as e: - parser.exit("{}: Error running jq: {}: {}.".format(program_name, type(e).__name__, e)) + exit_func("{}: Error running jq: {}: {}.".format(program_name, type(e).__name__, e)) |