Try parsing posh-ohai output as XML

Normally, PoSh-Ohai will return it's data as JSON. However, if
PowerShell version 3 is not available, we get data back as XML.

We now attempt to parse the returned text XML if we cannot parse it as
JSON first. We raise a SystemInfoInvalid exception if we cannot parse
the data as either JSON or XML. The SystemInfoNotJson and
SystemInfoMissingJson exceptions are no longer raised since they don't
make sense when the command can return two formats.

Change-Id: I6972595e6322d35c99fb6297565625fdb1cd951e
This commit is contained in:
Martin Geisler 2015-05-19 19:21:56 +02:00
parent 4b68fa3b4d
commit 239fa1e3c7
3 changed files with 246 additions and 19 deletions

View File

@ -106,6 +106,11 @@ class SystemInfoMissingJson(DiscoveryException):
"""Command did not produce stdout containing JSON."""
class SystemInfoInvalid(DiscoveryException):
"""Command did not produce valid JSON or XML."""
class SystemInfoCommandInstallFailed(DiscoveryException):
"""Failed to install package that provides system information."""

View File

@ -15,6 +15,7 @@
import json
import logging
import xml.etree.ElementTree as ET
import ipaddress as ipaddress_module
import six
@ -56,13 +57,11 @@ def system_info(client, with_install=False):
:param client: :class:`smb.SMB` instance
:returns: dict -- system information from PoSh-Ohai
:raises: SystemInfoCommandMissing, SystemInfoCommandOld, SystemInfoNotJson
SystemInfoMissingJson
:raises: SystemInfoCommandMissing, SystemInfoCommandOld, SystemInfoInvalid
SystemInfoCommandMissing if `posh-ohai` is not installed.
SystemInfoCommandOld if `posh-ohai` is not the latest.
SystemInfoNotJson if `posh-ohai` does not return valid JSON.
SystemInfoMissingJson if `posh-ohai` does not return any JSON.
SystemInfoInvalid if `posh-ohai` does not return valid JSON or XML.
"""
if with_install:
perform_install(client)
@ -71,15 +70,14 @@ def system_info(client, with_install=False):
powershell_command = 'Get-ComputerConfiguration'
output = client.execute(powershell_command)
unicode_output = "%s" % output
try:
results = json.loads(unicode_output)
except ValueError:
load_clean_json = lambda output: json.loads(get_json(output))
last_err = None
for loader in json.loads, parse_xml, load_clean_json:
try:
clean_output = get_json(unicode_output)
results = json.loads(clean_output)
return loader(unicode_output)
except ValueError as err:
raise errors.SystemInfoNotJson(err)
return results
last_err = err
raise errors.SystemInfoInvalid(last_err)
else:
raise errors.PlatformNotSupported(
"PoSh-Ohai is a Windows-only sytem info provider. "
@ -141,14 +139,154 @@ def get_json(data):
:param data: :string:
:returns: string -- JSON string stripped of non-JSON data
:raises: SystemInfoMissingJson
"""
first = data.index('{')
last = data.rindex('}')
return data[first:last + 1]
SystemInfoMissingJson if no JSON is returned.
def parse_text(elem):
"""Parse text from an element.
>>> parse_text(ET.XML('<Property>Hello World</Property>'))
'Hello World'
>>> parse_text(ET.XML('<Property>True </Property>'))
True
>>> parse_text(ET.XML('<Property>123</Property>'))
123
>>> print(parse_text(ET.XML('<Property />')))
None
"""
if elem.text is None:
return None
try:
return int(elem.text)
except ValueError:
pass
text = elem.text.strip()
if text == 'True':
return True
if text == 'False':
return False
return elem.text
def parse_list(elem):
"""Parse list of properties.
>>> parse_list(ET.XML('<Property />'))
[]
>>> xml = '''<Property>
... <Property>Hello</Property>
... <Property>World</Property>
... </Property>'''
>>> parse_list(ET.XML(xml))
['Hello', 'World']
"""
return [parse_elem(c) for c in elem]
def parse_attrib_dict(elem):
"""Parse list of properties.
>>> parse_attrib_dict(ET.XML('<Property />'))
{}
>>> xml = '''<Property>
... <Property Name="verb">Hello</Property>
... <Property Name="noun">World</Property>
... </Property>'''
>>> d = parse_attrib_dict(ET.XML(xml))
>>> sorted(d.items())
[('noun', 'World'), ('verb', 'Hello')]
"""
keys = [c.get('Name') for c in elem]
values = [parse_elem(c) for c in elem]
return dict(zip(keys, values))
def parse_key_value_dict(elem):
"""Parse list of properties.
>>> parse_key_value_dict(ET.XML('<Property />'))
{}
>>> xml = '''<Property>
... <Property Name="Key">verb</Property>
... <Property Name="Value">Hello</Property>
... <Property Name="Key">noun</Property>
... <Property Name="Value">World</Property>
... </Property>'''
>>> d = parse_key_value_dict(ET.XML(xml))
>>> sorted(d.items())
[('noun', 'World'), ('verb', 'Hello')]
"""
keys = [c.text for c in elem[::2]]
values = [parse_elem(c) for c in elem[1::2]]
return dict(zip(keys, values))
def parse_elem(elem):
"""Determine element type and dispatch to other parse functions."""
if len(elem) == 0:
return parse_text(elem)
if not elem[0].attrib:
return parse_list(elem)
if elem[0].get('Name') == 'Key':
return parse_key_value_dict(elem)
return parse_attrib_dict(elem)
def parse_xml(ohai_output):
r"""Parse XML Posh-Ohai output.
>>> output = '''\
... <?xml version="1.0"?>
... <Objects>
... <Object>
... <Property Name="Key">platform_family</Property>
... <Property Name="Value">Windows</Property>
... <Property Name="Key">logonhistory</Property>
... <Property Name="Value">
... <Property Name="Key">0x6dd0359</Property>
... <Property Name="Value">
... <Property Name="Key">user</Property>
... <Property Name="Value">WIN2008R2\\Administrator</Property>
... <Property Name="Key">logontype</Property>
... <Property Name="Value">10</Property>
... </Property>
... </Property>
... <Property Name="Key">loggedon_users</Property>
... <Property Name="Value">
... <Property>
... <Property Name="Session">995</Property>
... <Property Name="User">WIN2008R2\IUSR</Property>
... <Property Name="Type">Service</Property>
... </Property>
... <Property>
... <Property Name="Session">999</Property>
... <Property Name="User">WIN2008R2\SYSTEM</Property>
... <Property Name="Type">Local System</Property>
... </Property>
... </Property>
... </Object>
... </Objects>'''
>>> import pprint
>>> pprint.pprint(parse_xml(output))
{'loggedon_users': [{'Session': 995,
'Type': 'Service',
'User': 'WIN2008R2\\IUSR'},
{'Session': 999,
'Type': 'Local System',
'User': 'WIN2008R2\\SYSTEM'}],
'logonhistory': {'0x6dd0359': {'logontype': 10,
'user': 'WIN2008R2\\Administrator'}},
'platform_family': 'Windows'}
"""
try:
first = data.index('{')
last = data.rindex('}')
return data[first:last + 1]
except ValueError as exc:
context = {"ValueError": "%s" % exc}
raise errors.SystemInfoMissingJson(context)
root = ET.XML(ohai_output)
except ET.ParseError as err:
raise ValueError(err)
try:
properties = root[0]
except IndexError as err:
raise ValueError('XML had unexpected structure')
return parse_elem(properties)

View File

@ -0,0 +1,84 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Test PoSh-Ohai Plugin."""
import doctest
import unittest
import mock
from satori import errors
from satori.sysinfo import posh_ohai
from satori.tests import utils
def load_tests(loader, tests, ignore):
"""Include doctests as unit tests."""
tests.addTests(doctest.DocTestSuite(posh_ohai))
return tests
class TestSystemInfo(utils.TestCase):
def setUp(self):
super(TestSystemInfo, self).setUp()
self.client = mock.MagicMock()
self.client.is_windows.return_value = True
def test_system_info(self):
self.client.execute.return_value = "{}"
posh_ohai.system_info(self.client)
self.client.execute.assert_called_with("Get-ComputerConfiguration")
def test_system_info_json(self):
self.client.execute.return_value = '{"foo": 123}'
self.assertEqual(posh_ohai.system_info(self.client), {'foo': 123})
def test_system_info_json_with_motd(self):
self.client.execute.return_value = "Hello world\n {}"
self.assertEqual(posh_ohai.system_info(self.client), {})
def test_system_info_xml(self):
valid_xml = '''<Objects>"
<Object>"
<Property Name="Key">platform_family</Property>
<Property Name="Value">Windows</Property>
</Object>
</Objects>'''
self.client.execute.return_value = valid_xml
self.assertEqual(posh_ohai.system_info(self.client),
{'platform_family': 'Windows'})
def test_system_info_bad_json(self):
self.client.execute.return_value = "{Not JSON!}"
self.assertRaises(errors.SystemInfoInvalid,
posh_ohai.system_info, self.client)
def test_system_info_bad_xml(self):
self.client.execute.return_value = "<foo><bar>"
self.assertRaises(errors.SystemInfoInvalid,
posh_ohai.system_info, self.client)
def test_system_info_bad_xml(self):
self.client.execute.return_value = "<foo>bad structure</foo>"
self.assertRaises(errors.SystemInfoInvalid,
posh_ohai.system_info, self.client)
def test_system_info_invalid(self):
self.client.execute.return_value = "No JSON and not XML!"
self.assertRaises(errors.SystemInfoInvalid,
posh_ohai.system_info, self.client)
if __name__ == "__main__":
unittest.main()