From 239fa1e3c7aac78599145c670576f0ac76a41a89 Mon Sep 17 00:00:00 2001 From: Martin Geisler Date: Tue, 19 May 2015 19:21:56 +0200 Subject: [PATCH] Try parsing posh-ohai output as XML Normally, PoSh-Ohai will return it's data as JSON. However, if PowerShell version 3 is not available, we get data back as XML. We now attempt to parse the returned text XML if we cannot parse it as JSON first. We raise a SystemInfoInvalid exception if we cannot parse the data as either JSON or XML. The SystemInfoNotJson and SystemInfoMissingJson exceptions are no longer raised since they don't make sense when the command can return two formats. Change-Id: I6972595e6322d35c99fb6297565625fdb1cd951e --- satori/errors.py | 5 + satori/sysinfo/posh_ohai.py | 176 ++++++++++++++++++++++--- satori/tests/test_sysinfo_posh_ohai.py | 84 ++++++++++++ 3 files changed, 246 insertions(+), 19 deletions(-) create mode 100644 satori/tests/test_sysinfo_posh_ohai.py diff --git a/satori/errors.py b/satori/errors.py index b1e449b..06bc494 100644 --- a/satori/errors.py +++ b/satori/errors.py @@ -106,6 +106,11 @@ class SystemInfoMissingJson(DiscoveryException): """Command did not produce stdout containing JSON.""" +class SystemInfoInvalid(DiscoveryException): + + """Command did not produce valid JSON or XML.""" + + class SystemInfoCommandInstallFailed(DiscoveryException): """Failed to install package that provides system information.""" diff --git a/satori/sysinfo/posh_ohai.py b/satori/sysinfo/posh_ohai.py index bbdbecb..2d06777 100644 --- a/satori/sysinfo/posh_ohai.py +++ b/satori/sysinfo/posh_ohai.py @@ -15,6 +15,7 @@ import json import logging +import xml.etree.ElementTree as ET import ipaddress as ipaddress_module import six @@ -56,13 +57,11 @@ def system_info(client, with_install=False): :param client: :class:`smb.SMB` instance :returns: dict -- system information from PoSh-Ohai - :raises: SystemInfoCommandMissing, SystemInfoCommandOld, SystemInfoNotJson - SystemInfoMissingJson + :raises: SystemInfoCommandMissing, SystemInfoCommandOld, SystemInfoInvalid SystemInfoCommandMissing if `posh-ohai` is not installed. SystemInfoCommandOld if `posh-ohai` is not the latest. - SystemInfoNotJson if `posh-ohai` does not return valid JSON. - SystemInfoMissingJson if `posh-ohai` does not return any JSON. + SystemInfoInvalid if `posh-ohai` does not return valid JSON or XML. """ if with_install: perform_install(client) @@ -71,15 +70,14 @@ def system_info(client, with_install=False): powershell_command = 'Get-ComputerConfiguration' output = client.execute(powershell_command) unicode_output = "%s" % output - try: - results = json.loads(unicode_output) - except ValueError: + load_clean_json = lambda output: json.loads(get_json(output)) + last_err = None + for loader in json.loads, parse_xml, load_clean_json: try: - clean_output = get_json(unicode_output) - results = json.loads(clean_output) + return loader(unicode_output) except ValueError as err: - raise errors.SystemInfoNotJson(err) - return results + last_err = err + raise errors.SystemInfoInvalid(last_err) else: raise errors.PlatformNotSupported( "PoSh-Ohai is a Windows-only sytem info provider. " @@ -141,14 +139,154 @@ def get_json(data): :param data: :string: :returns: string -- JSON string stripped of non-JSON data - :raises: SystemInfoMissingJson + """ + first = data.index('{') + last = data.rindex('}') + return data[first:last + 1] - SystemInfoMissingJson if no JSON is returned. + +def parse_text(elem): + """Parse text from an element. + + >>> parse_text(ET.XML('Hello World')) + 'Hello World' + >>> parse_text(ET.XML('True ')) + True + >>> parse_text(ET.XML('123')) + 123 + >>> print(parse_text(ET.XML(''))) + None + """ + if elem.text is None: + return None + try: + return int(elem.text) + except ValueError: + pass + text = elem.text.strip() + if text == 'True': + return True + if text == 'False': + return False + return elem.text + + +def parse_list(elem): + """Parse list of properties. + + >>> parse_list(ET.XML('')) + [] + >>> xml = ''' + ... Hello + ... World + ... ''' + >>> parse_list(ET.XML(xml)) + ['Hello', 'World'] + """ + return [parse_elem(c) for c in elem] + + +def parse_attrib_dict(elem): + """Parse list of properties. + + >>> parse_attrib_dict(ET.XML('')) + {} + >>> xml = ''' + ... Hello + ... World + ... ''' + >>> d = parse_attrib_dict(ET.XML(xml)) + >>> sorted(d.items()) + [('noun', 'World'), ('verb', 'Hello')] + """ + keys = [c.get('Name') for c in elem] + values = [parse_elem(c) for c in elem] + return dict(zip(keys, values)) + + +def parse_key_value_dict(elem): + """Parse list of properties. + + >>> parse_key_value_dict(ET.XML('')) + {} + >>> xml = ''' + ... verb + ... Hello + ... noun + ... World + ... ''' + >>> d = parse_key_value_dict(ET.XML(xml)) + >>> sorted(d.items()) + [('noun', 'World'), ('verb', 'Hello')] + """ + keys = [c.text for c in elem[::2]] + values = [parse_elem(c) for c in elem[1::2]] + return dict(zip(keys, values)) + + +def parse_elem(elem): + """Determine element type and dispatch to other parse functions.""" + if len(elem) == 0: + return parse_text(elem) + if not elem[0].attrib: + return parse_list(elem) + if elem[0].get('Name') == 'Key': + return parse_key_value_dict(elem) + return parse_attrib_dict(elem) + + +def parse_xml(ohai_output): + r"""Parse XML Posh-Ohai output. + + >>> output = '''\ + ... + ... + ... + ... platform_family + ... Windows + ... logonhistory + ... + ... 0x6dd0359 + ... + ... user + ... WIN2008R2\\Administrator + ... logontype + ... 10 + ... + ... + ... loggedon_users + ... + ... + ... 995 + ... WIN2008R2\IUSR + ... Service + ... + ... + ... 999 + ... WIN2008R2\SYSTEM + ... Local System + ... + ... + ... + ... ''' + >>> import pprint + >>> pprint.pprint(parse_xml(output)) + {'loggedon_users': [{'Session': 995, + 'Type': 'Service', + 'User': 'WIN2008R2\\IUSR'}, + {'Session': 999, + 'Type': 'Local System', + 'User': 'WIN2008R2\\SYSTEM'}], + 'logonhistory': {'0x6dd0359': {'logontype': 10, + 'user': 'WIN2008R2\\Administrator'}}, + 'platform_family': 'Windows'} """ try: - first = data.index('{') - last = data.rindex('}') - return data[first:last + 1] - except ValueError as exc: - context = {"ValueError": "%s" % exc} - raise errors.SystemInfoMissingJson(context) + root = ET.XML(ohai_output) + except ET.ParseError as err: + raise ValueError(err) + try: + properties = root[0] + except IndexError as err: + raise ValueError('XML had unexpected structure') + return parse_elem(properties) diff --git a/satori/tests/test_sysinfo_posh_ohai.py b/satori/tests/test_sysinfo_posh_ohai.py new file mode 100644 index 0000000..d5a4cd3 --- /dev/null +++ b/satori/tests/test_sysinfo_posh_ohai.py @@ -0,0 +1,84 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +"""Test PoSh-Ohai Plugin.""" + +import doctest +import unittest + +import mock + +from satori import errors +from satori.sysinfo import posh_ohai +from satori.tests import utils + + +def load_tests(loader, tests, ignore): + """Include doctests as unit tests.""" + tests.addTests(doctest.DocTestSuite(posh_ohai)) + return tests + + +class TestSystemInfo(utils.TestCase): + + def setUp(self): + super(TestSystemInfo, self).setUp() + self.client = mock.MagicMock() + self.client.is_windows.return_value = True + + def test_system_info(self): + self.client.execute.return_value = "{}" + posh_ohai.system_info(self.client) + self.client.execute.assert_called_with("Get-ComputerConfiguration") + + def test_system_info_json(self): + self.client.execute.return_value = '{"foo": 123}' + self.assertEqual(posh_ohai.system_info(self.client), {'foo': 123}) + + def test_system_info_json_with_motd(self): + self.client.execute.return_value = "Hello world\n {}" + self.assertEqual(posh_ohai.system_info(self.client), {}) + + def test_system_info_xml(self): + valid_xml = '''" + " + platform_family + Windows + + ''' + self.client.execute.return_value = valid_xml + self.assertEqual(posh_ohai.system_info(self.client), + {'platform_family': 'Windows'}) + + def test_system_info_bad_json(self): + self.client.execute.return_value = "{Not JSON!}" + self.assertRaises(errors.SystemInfoInvalid, + posh_ohai.system_info, self.client) + + def test_system_info_bad_xml(self): + self.client.execute.return_value = "" + self.assertRaises(errors.SystemInfoInvalid, + posh_ohai.system_info, self.client) + + def test_system_info_bad_xml(self): + self.client.execute.return_value = "bad structure" + self.assertRaises(errors.SystemInfoInvalid, + posh_ohai.system_info, self.client) + + def test_system_info_invalid(self): + self.client.execute.return_value = "No JSON and not XML!" + self.assertRaises(errors.SystemInfoInvalid, + posh_ohai.system_info, self.client) + + +if __name__ == "__main__": + unittest.main()