use-buildset-registry: Vendor pytoml and remarshal

In order to edit the V2 registries.conf file used by podman, we
need to be able to manipulate toml from ansible.  There is no
standard library or Ansible support for that now, and we don't want
to install any python packages on the remote node.  Therefore,
vendor the remarshal and pytoml code into this role.

This is done in a standalone commit for easier review and auditing.

The originating projects are:

  https://github.com/dbohdan/remarshal
  https://github.com/avakar/pytoml

And both are MIT licensed.  Appropriate headers are added where
necessary.

Note that pytoml has been concatenated into one file in order to
adhere to Ansible's requirements for python modules.

Change-Id: I679ea5eb5cb29591be09d2f1b712400c49158abd
This commit is contained in:
James E. Blair 2019-11-19 08:40:32 -08:00
parent 0b0cb18a60
commit ec8a58ddb7
3 changed files with 979 additions and 1 deletions
roles/use-buildset-registry/module_utils
tox.ini

View File

@ -0,0 +1,551 @@
# No-notice MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# Originally from:
# https://github.com/avakar/pytoml
from __future__ import unicode_literals
import datetime
import re, sys
import io, datetime, math, string, sys
try:
from pathlib import PurePath as _path_types
except ImportError:
_path_types = ()
if sys.version_info[0] == 3:
long = int
unicode = str
class TomlError(RuntimeError):
def __init__(self, message, line, col, filename):
RuntimeError.__init__(self, message, line, col, filename)
self.message = message
self.line = line
self.col = col
self.filename = filename
def __str__(self):
return '{}({}, {}): {}'.format(self.filename, self.line, self.col, self.message)
def __repr__(self):
return 'TomlError({!r}, {!r}, {!r}, {!r})'.format(self.message, self.line, self.col, self.filename)
rfc3339_re = re.compile(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})(\.\d+)?(?:Z|([+-]\d{2}):(\d{2}))')
def parse_rfc3339(v):
m = rfc3339_re.match(v)
if not m or m.group(0) != v:
return None
return parse_rfc3339_re(m)
def parse_rfc3339_re(m):
r = map(int, m.groups()[:6])
if m.group(7):
micro = float(m.group(7))
else:
micro = 0
if m.group(8):
g = int(m.group(8), 10) * 60 + int(m.group(9), 10)
tz = _TimeZone(datetime.timedelta(0, g * 60))
else:
tz = _TimeZone(datetime.timedelta(0, 0))
y, m, d, H, M, S = r
return datetime.datetime(y, m, d, H, M, S, int(micro * 1000000), tz)
def format_rfc3339(v):
offs = v.utcoffset()
offs = int(offs.total_seconds()) // 60 if offs is not None else 0
if offs == 0:
suffix = 'Z'
else:
if offs > 0:
suffix = '+'
else:
suffix = '-'
offs = -offs
suffix = '{0}{1:02}:{2:02}'.format(suffix, offs // 60, offs % 60)
if v.microsecond:
return v.strftime('%Y-%m-%dT%H:%M:%S.%f') + suffix
else:
return v.strftime('%Y-%m-%dT%H:%M:%S') + suffix
class _TimeZone(datetime.tzinfo):
def __init__(self, offset):
self._offset = offset
def utcoffset(self, dt):
return self._offset
def dst(self, dt):
return None
def tzname(self, dt):
m = self._offset.total_seconds() // 60
if m < 0:
res = '-'
m = -m
else:
res = '+'
h = m // 60
m = m - h * 60
return '{}{:.02}{:.02}'.format(res, h, m)
if sys.version_info[0] == 2:
_chr = unichr
else:
_chr = chr
def load(fin, translate=lambda t, x, v: v, object_pairs_hook=dict):
return loads(fin.read(), translate=translate, object_pairs_hook=object_pairs_hook, filename=getattr(fin, 'name', repr(fin)))
def loads(s, filename='<string>', translate=lambda t, x, v: v, object_pairs_hook=dict):
if isinstance(s, bytes):
s = s.decode('utf-8')
s = s.replace('\r\n', '\n')
root = object_pairs_hook()
tables = object_pairs_hook()
scope = root
src = _Source(s, filename=filename)
ast = _p_toml(src, object_pairs_hook=object_pairs_hook)
def error(msg):
raise TomlError(msg, pos[0], pos[1], filename)
def process_value(v, object_pairs_hook):
kind, text, value, pos = v
if kind == 'array':
if value and any(k != value[0][0] for k, t, v, p in value[1:]):
error('array-type-mismatch')
value = [process_value(item, object_pairs_hook=object_pairs_hook) for item in value]
elif kind == 'table':
value = object_pairs_hook([(k, process_value(value[k], object_pairs_hook=object_pairs_hook)) for k in value])
return translate(kind, text, value)
for kind, value, pos in ast:
if kind == 'kv':
k, v = value
if k in scope:
error('duplicate_keys. Key "{0}" was used more than once.'.format(k))
scope[k] = process_value(v, object_pairs_hook=object_pairs_hook)
else:
is_table_array = (kind == 'table_array')
cur = tables
for name in value[:-1]:
if isinstance(cur.get(name), list):
d, cur = cur[name][-1]
else:
d, cur = cur.setdefault(name, (None, object_pairs_hook()))
scope = object_pairs_hook()
name = value[-1]
if name not in cur:
if is_table_array:
cur[name] = [(scope, object_pairs_hook())]
else:
cur[name] = (scope, object_pairs_hook())
elif isinstance(cur[name], list):
if not is_table_array:
error('table_type_mismatch')
cur[name].append((scope, object_pairs_hook()))
else:
if is_table_array:
error('table_type_mismatch')
old_scope, next_table = cur[name]
if old_scope is not None:
error('duplicate_tables')
cur[name] = (scope, next_table)
def merge_tables(scope, tables):
if scope is None:
scope = object_pairs_hook()
for k in tables:
if k in scope:
error('key_table_conflict')
v = tables[k]
if isinstance(v, list):
scope[k] = [merge_tables(sc, tbl) for sc, tbl in v]
else:
scope[k] = merge_tables(v[0], v[1])
return scope
return merge_tables(root, tables)
class _Source:
def __init__(self, s, filename=None):
self.s = s
self._pos = (1, 1)
self._last = None
self._filename = filename
self.backtrack_stack = []
def last(self):
return self._last
def pos(self):
return self._pos
def fail(self):
return self._expect(None)
def consume_dot(self):
if self.s:
self._last = self.s[0]
self.s = self[1:]
self._advance(self._last)
return self._last
return None
def expect_dot(self):
return self._expect(self.consume_dot())
def consume_eof(self):
if not self.s:
self._last = ''
return True
return False
def expect_eof(self):
return self._expect(self.consume_eof())
def consume(self, s):
if self.s.startswith(s):
self.s = self.s[len(s):]
self._last = s
self._advance(s)
return True
return False
def expect(self, s):
return self._expect(self.consume(s))
def consume_re(self, re):
m = re.match(self.s)
if m:
self.s = self.s[len(m.group(0)):]
self._last = m
self._advance(m.group(0))
return m
return None
def expect_re(self, re):
return self._expect(self.consume_re(re))
def __enter__(self):
self.backtrack_stack.append((self.s, self._pos))
def __exit__(self, type, value, traceback):
if type is None:
self.backtrack_stack.pop()
else:
self.s, self._pos = self.backtrack_stack.pop()
return type == TomlError
def commit(self):
self.backtrack_stack[-1] = (self.s, self._pos)
def _expect(self, r):
if not r:
raise TomlError('msg', self._pos[0], self._pos[1], self._filename)
return r
def _advance(self, s):
suffix_pos = s.rfind('\n')
if suffix_pos == -1:
self._pos = (self._pos[0], self._pos[1] + len(s))
else:
self._pos = (self._pos[0] + s.count('\n'), len(s) - suffix_pos)
_ews_re = re.compile(r'(?:[ \t]|#[^\n]*\n|#[^\n]*\Z|\n)*')
def _p_ews(s):
s.expect_re(_ews_re)
_ws_re = re.compile(r'[ \t]*')
def _p_ws(s):
s.expect_re(_ws_re)
_escapes = { 'b': '\b', 'n': '\n', 'r': '\r', 't': '\t', '"': '"',
'\\': '\\', 'f': '\f' }
_basicstr_re = re.compile(r'[^"\\\000-\037]*')
_short_uni_re = re.compile(r'u([0-9a-fA-F]{4})')
_long_uni_re = re.compile(r'U([0-9a-fA-F]{8})')
_escapes_re = re.compile(r'[btnfr\"\\]')
_newline_esc_re = re.compile('\n[ \t\n]*')
def _p_basicstr_content(s, content=_basicstr_re):
res = []
while True:
res.append(s.expect_re(content).group(0))
if not s.consume('\\'):
break
if s.consume_re(_newline_esc_re):
pass
elif s.consume_re(_short_uni_re) or s.consume_re(_long_uni_re):
v = int(s.last().group(1), 16)
if 0xd800 <= v < 0xe000:
s.fail()
res.append(_chr(v))
else:
s.expect_re(_escapes_re)
res.append(_escapes[s.last().group(0)])
return ''.join(res)
_key_re = re.compile(r'[0-9a-zA-Z-_]+')
def _p_key(s):
with s:
s.expect('"')
r = _p_basicstr_content(s, _basicstr_re)
s.expect('"')
return r
if s.consume('\''):
if s.consume('\'\''):
s.consume('\n')
r = s.expect_re(_litstr_ml_re).group(0)
s.expect('\'\'\'')
else:
r = s.expect_re(_litstr_re).group(0)
s.expect('\'')
return r
return s.expect_re(_key_re).group(0)
_float_re = re.compile(r'[+-]?(?:0|[1-9](?:_?\d)*)(?:\.\d(?:_?\d)*)?(?:[eE][+-]?(?:\d(?:_?\d)*))?')
_basicstr_ml_re = re.compile(r'(?:""?(?!")|[^"\\\000-\011\013-\037])*')
_litstr_re = re.compile(r"[^'\000\010\012-\037]*")
_litstr_ml_re = re.compile(r"(?:(?:|'|'')(?:[^'\000-\010\013-\037]))*")
def _p_value(s, object_pairs_hook):
pos = s.pos()
if s.consume('true'):
return 'bool', s.last(), True, pos
if s.consume('false'):
return 'bool', s.last(), False, pos
if s.consume('"'):
if s.consume('""'):
s.consume('\n')
r = _p_basicstr_content(s, _basicstr_ml_re)
s.expect('"""')
else:
r = _p_basicstr_content(s, _basicstr_re)
s.expect('"')
return 'str', r, r, pos
if s.consume('\''):
if s.consume('\'\''):
s.consume('\n')
r = s.expect_re(_litstr_ml_re).group(0)
s.expect('\'\'\'')
else:
r = s.expect_re(_litstr_re).group(0)
s.expect('\'')
return 'str', r, r, pos
if s.consume_re(rfc3339_re):
m = s.last()
return 'datetime', m.group(0), parse_rfc3339_re(m), pos
if s.consume_re(_float_re):
m = s.last().group(0)
r = m.replace('_','')
if '.' in m or 'e' in m or 'E' in m:
return 'float', m, float(r), pos
else:
return 'int', m, int(r, 10), pos
if s.consume('['):
items = []
with s:
while True:
_p_ews(s)
items.append(_p_value(s, object_pairs_hook=object_pairs_hook))
s.commit()
_p_ews(s)
s.expect(',')
s.commit()
_p_ews(s)
s.expect(']')
return 'array', None, items, pos
if s.consume('{'):
_p_ws(s)
items = object_pairs_hook()
if not s.consume('}'):
k = _p_key(s)
_p_ws(s)
s.expect('=')
_p_ws(s)
items[k] = _p_value(s, object_pairs_hook=object_pairs_hook)
_p_ws(s)
while s.consume(','):
_p_ws(s)
k = _p_key(s)
_p_ws(s)
s.expect('=')
_p_ws(s)
items[k] = _p_value(s, object_pairs_hook=object_pairs_hook)
_p_ws(s)
s.expect('}')
return 'table', None, items, pos
s.fail()
def _p_stmt(s, object_pairs_hook):
pos = s.pos()
if s.consume( '['):
is_array = s.consume('[')
_p_ws(s)
keys = [_p_key(s)]
_p_ws(s)
while s.consume('.'):
_p_ws(s)
keys.append(_p_key(s))
_p_ws(s)
s.expect(']')
if is_array:
s.expect(']')
return 'table_array' if is_array else 'table', keys, pos
key = _p_key(s)
_p_ws(s)
s.expect('=')
_p_ws(s)
value = _p_value(s, object_pairs_hook=object_pairs_hook)
return 'kv', (key, value), pos
_stmtsep_re = re.compile(r'(?:[ \t]*(?:#[^\n]*)?\n)+[ \t]*')
def _p_toml(s, object_pairs_hook):
stmts = []
_p_ews(s)
with s:
stmts.append(_p_stmt(s, object_pairs_hook=object_pairs_hook))
while True:
s.commit()
s.expect_re(_stmtsep_re)
stmts.append(_p_stmt(s, object_pairs_hook=object_pairs_hook))
_p_ews(s)
s.expect_eof()
return stmts
def dumps(obj, sort_keys=False):
fout = io.StringIO()
dump(obj, fout, sort_keys=sort_keys)
return fout.getvalue()
_escapes = {'\n': 'n', '\r': 'r', '\\': '\\', '\t': 't', '\b': 'b', '\f': 'f', '"': '"'}
def _escape_string(s):
res = []
start = 0
def flush():
if start != i:
res.append(s[start:i])
return i + 1
i = 0
while i < len(s):
c = s[i]
if c in '"\\\n\r\t\b\f':
start = flush()
res.append('\\' + _escapes[c])
elif ord(c) < 0x20:
start = flush()
res.append('\\u%04x' % ord(c))
i += 1
flush()
return '"' + ''.join(res) + '"'
_key_chars = string.digits + string.ascii_letters + '-_'
def _escape_id(s):
if any(c not in _key_chars for c in s):
return _escape_string(s)
return s
def _format_value(v):
if isinstance(v, bool):
return 'true' if v else 'false'
if isinstance(v, int) or isinstance(v, long):
return unicode(v)
if isinstance(v, float):
if math.isnan(v) or math.isinf(v):
raise ValueError("{0} is not a valid TOML value".format(v))
else:
return repr(v)
elif isinstance(v, unicode) or isinstance(v, bytes):
return _escape_string(v)
elif isinstance(v, datetime.datetime):
return format_rfc3339(v)
elif isinstance(v, list):
return '[{0}]'.format(', '.join(_format_value(obj) for obj in v))
elif isinstance(v, dict):
return '{{{0}}}'.format(', '.join('{} = {}'.format(_escape_id(k), _format_value(obj)) for k, obj in v.items()))
elif isinstance(v, _path_types):
return _escape_string(str(v))
else:
raise RuntimeError(v)
def dump(obj, fout, sort_keys=False):
tables = [((), obj, False)]
while tables:
name, table, is_array = tables.pop()
if name:
section_name = '.'.join(_escape_id(c) for c in name)
if is_array:
fout.write('[[{0}]]\n'.format(section_name))
else:
fout.write('[{0}]\n'.format(section_name))
table_keys = sorted(table.keys()) if sort_keys else table.keys()
new_tables = []
has_kv = False
for k in table_keys:
v = table[k]
if isinstance(v, dict):
new_tables.append((name + (k,), v, False))
elif isinstance(v, list) and v and all(isinstance(o, dict) for o in v):
new_tables.extend((name + (k,), d, True) for d in v)
elif v is None:
# based on mojombo's comment: https://github.com/toml-lang/toml/issues/146#issuecomment-25019344
fout.write(
'#{} = null # To use: uncomment and replace null with value\n'.format(_escape_id(k)))
has_kv = True
else:
fout.write('{0} = {1}\n'.format(_escape_id(k), _format_value(v)))
has_kv = True
tables.extend(reversed(new_tables))
if (name or has_kv) and tables:
fout.write('\n')

View File

@ -0,0 +1,418 @@
#! /usr/bin/env python3
# remarshal, a utility to convert between serialization formats.
# Copyright (c) 2014, 2015, 2016, 2017, 2018, 2019 dbohdan
# License: MIT
# Originally from:
# https://github.com/dbohdan/remarshal
from __future__ import print_function
import argparse
import datetime
# import dateutil.parser
import io
import json
import os.path
import re
import string
import sys
import test
from ansible.module_utils.pytoml import loads as pytoml_loads
from ansible.module_utils.pytoml import dumps as pytoml_dumps
from ansible.module_utils.pytoml import TomlError
# import umsgpack
# import yaml
from collections import OrderedDict
__version__ = '0.11.2'
FORMATS = ['json', 'msgpack', 'toml', 'yaml']
# === JSON ===
if hasattr(json, 'JSONDecodeError'):
JSONDecodeError = json.JSONDecodeError
else:
JSONDecodeError = ValueError
def json_default(obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
raise TypeError("{0} is not JSON-serializable".format(repr(obj)))
# === CLI ===
def argv0_to_format(argv0):
possible_format = '(' + '|'.join(FORMATS) + ')'
match = re.search('^' + possible_format + '2' + possible_format, argv0)
if match:
from_, to = match.groups()
return True, from_, to
else:
return False, None, None
def extension_to_format(path):
_, ext = os.path.splitext(path)
ext = ext[1:]
if ext == 'yml':
ext = 'yaml'
return ext if ext in FORMATS else None
def parse_command_line(argv):
me = os.path.basename(argv[0])
format_from_argv0, argv0_from, argv0_to = argv0_to_format(me)
parser = argparse.ArgumentParser(
description='Convert between TOML, MessagePack, YAML, and JSON.'
)
input_group = parser.add_mutually_exclusive_group()
input_group.add_argument(
'input',
nargs='?',
default='-',
help='input file'
)
input_group.add_argument(
'-i', '--input',
dest='input_flag',
metavar='input',
default=None,
help='input file'
)
output_group = parser.add_mutually_exclusive_group()
output_group.add_argument(
'output',
nargs='?',
default='-',
help='input file'
)
output_group.add_argument(
'-o', '--output',
dest='output_flag',
metavar='output',
default=None,
help='output file'
)
if not format_from_argv0:
parser.add_argument(
'--if', '-if', '--input-format',
dest='input_format',
help="input format",
choices=FORMATS
)
parser.add_argument(
'--of',
'-of',
'--output-format',
dest='output_format',
help="output format",
choices=FORMATS
)
if not format_from_argv0 or argv0_to == 'json':
parser.add_argument(
'--indent-json',
dest='indent_json',
metavar='n',
type=int,
default=None,
help='indent JSON output'
)
if not format_from_argv0 or argv0_to == 'yaml':
parser.add_argument(
'--yaml-style',
dest='yaml_style',
default=None,
help='YAML formatting style',
choices=['', '\'', '"', '|', '>']
)
parser.add_argument(
'--wrap',
dest='wrap',
metavar='key',
default=None,
help='wrap the data in a map type with the given key'
)
parser.add_argument(
'--unwrap',
dest='unwrap',
metavar='key',
default=None,
help='only output the data stored under the given key'
)
parser.add_argument(
'-p', '--preserve-key-order',
dest='ordered',
action='store_true',
help='preserve the order of dictionary/mapping keys'
)
parser.add_argument(
'-v', '--version',
action='version',
version=__version__
)
args = parser.parse_args(args=argv[1:])
# Use the positional input and output arguments.
if args.input_flag is not None:
args.input = args.input_flag
if args.output_flag is not None:
args.output = args.output_flag
# Determine the implicit input and output format if possible.
if format_from_argv0:
args.input_format = argv0_from
args.output_format = argv0_to
if argv0_to != 'json':
args.__dict__['indent_json'] = None
if argv0_to != 'yaml':
args.__dict__['yaml_style'] = None
else:
if args.input_format is None:
args.input_format = extension_to_format(args.input)
if args.input_format is None:
parser.error('Need an explicit input format')
if args.output_format is None:
args.output_format = extension_to_format(args.output)
if args.output_format is None:
parser.error('Need an explicit output format')
# Wrap yaml_style.
args.__dict__['yaml_options'] = {'default_style': args.yaml_style}
del args.__dict__['yaml_style']
return args
# === Parser/serializer wrappers ===
def decode_json(input_data, ordered):
try:
pairs_hook = OrderedDict if ordered else dict
return json.loads(
input_data.decode('utf-8'),
object_pairs_hook=pairs_hook
)
except JSONDecodeError as e:
raise ValueError('Cannot parse as JSON ({0})'.format(e))
def decode_msgpack(input_data, ordered):
try:
return umsgpack.unpackb(input_data, use_ordered_dict=ordered)
except umsgpack.UnpackException as e:
raise ValueError('Cannot parse as MessagePack ({0})'.format(e))
def decode_toml(input_data, ordered):
try:
pairs_hook = OrderedDict if ordered else dict
return pytoml_loads(
input_data,
object_pairs_hook=pairs_hook
)
except TomlError as e:
raise ValueError('Cannot parse as TOML ({0})'.format(e))
def decode(input_format, input_data, ordered):
decoder = {
'json': decode_json,
'msgpack': decode_msgpack,
'toml': decode_toml,
}
if input_format not in decoder:
raise ValueError('Unknown input format: {0}'.format(input_format))
return decoder[input_format](input_data, ordered)
def encode_json(data, ordered, indent):
if indent is True:
indent = 2
if indent:
separators = (',', ': ')
else:
separators = (',', ':')
try:
return json.dumps(
data,
default=json_default,
ensure_ascii=False,
indent=indent,
separators=separators,
sort_keys=not ordered
) + "\n"
except TypeError as e:
raise ValueError('Cannot convert data to JSON ({0})'.format(e))
def traverse(
col,
dict_callback=lambda x: x,
list_callback=lambda x: x,
key_callback=lambda x: x,
value_callback=lambda x: x
):
if isinstance(col, dict):
return dict_callback(col.__class__([
(key_callback(k), traverse(
v,
dict_callback,
list_callback,
key_callback,
value_callback
)) for (k, v) in col.items()
]))
elif isinstance(col, list):
return list_callback([traverse(
x,
dict_callback,
list_callback,
key_callback,
value_callback
) for x in col])
else:
return value_callback(col)
def encode_msgpack(data):
try:
return umsgpack.packb(data)
except umsgpack.UnsupportedTypeException as e:
raise ValueError('Cannot convert data to MessagePack ({0})'.format(e))
def encode_toml(data, ordered):
try:
return pytoml_dumps(data, sort_keys=not ordered)
except AttributeError as e:
if str(e) == "'list' object has no attribute 'keys'":
raise ValueError(
'Cannot convert non-dictionary data to '
'TOML; use "wrap" to wrap it in a '
'dictionary'
)
else:
raise e
except TypeError as e:
if str(e) == "'in <string>' requires string as left operand, not int":
raise ValueError('Cannot convert binary to TOML')
else:
raise ValueError('Cannot convert data to TOML ({0})'.format(e))
# === Main ===
def run(argv):
args = parse_command_line(argv)
remarshal(
args.input,
args.output,
args.input_format,
args.output_format,
args.wrap,
args.unwrap,
args.indent_json,
args.yaml_options,
args.ordered
)
def remarshal(
input,
output,
input_format,
output_format,
wrap=None,
unwrap=None,
indent_json=None,
yaml_options={},
ordered=False,
transform=None,
):
try:
if input == '-':
input_file = getattr(sys.stdin, 'buffer', sys.stdin)
else:
input_file = open(input, 'rb')
if output == '-':
output_file = getattr(sys.stdout, 'buffer', sys.stdout)
else:
output_file = open(output, 'wb')
input_data = input_file.read()
parsed = decode(input_format, input_data, ordered)
if unwrap is not None:
parsed = parsed[unwrap]
if wrap is not None:
temp = {}
temp[wrap] = parsed
parsed = temp
if transform:
parsed = transform(parsed)
if output_format == 'json':
output_data = encode_json(parsed, ordered, indent_json)
elif output_format == 'msgpack':
output_data = encode_msgpack(parsed)
elif output_format == 'toml':
output_data = encode_toml(parsed, ordered)
else:
raise ValueError(
'Unknown output format: {0}'.format(output_format)
)
if output_format == 'msgpack':
encoded = output_data
else:
encoded = output_data.encode('utf-8')
output_file.write(encoded)
output_file.close()
finally:
if 'input_file' in locals():
input_file.close()
if 'output_file' in locals():
output_file.close()
def main():
try:
run(sys.argv)
except KeyboardInterrupt as e:
pass
except (IOError, ValueError) as e:
print('Error: {0}'.format(e), file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

11
tox.ini
View File

@ -63,4 +63,13 @@ commands = {posargs}
# W503 and W504 enabled
ignore = E125,E129,E402,E741,W504,H
show-source = True
exclude = .venv,.tox,dist,doc,build,*.egg
exclude =
.venv,
.tox,
dist,
doc,
build,
*.egg,
# vendored files
roles/use-buildset-registry/module_utils/pytoml.py,
roles/use-buildset-registry/module_utils/remarshal.py