Move Python ramdisk code out of tree

New home will be IPA, patch is already being worked on:
https://review.openstack.org/#/c/194116/

Anyway, we have to get rid of the ramdisk code before release.

Change-Id: I1b71a466059e70fd249712eaaf325efd459addcf
Closes-Bug: #1464708
This commit is contained in:
Dmitry Tantsur 2015-06-29 12:22:41 +02:00
parent 9654a066de
commit 931685d008
10 changed files with 1 additions and 909 deletions

View File

@ -1,4 +0,0 @@
#!/usr/bin/env python
from ironic_inspector_ramdisk import main
main.main()

View File

@ -1,257 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import logging
import os
import subprocess
import tarfile
import tempfile
import netifaces
import requests
LOG = logging.getLogger('ironic-inspector-ramdisk')
def try_call(*cmd, **kwargs):
strip = kwargs.pop('strip', True)
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
try:
p = subprocess.Popen(cmd, **kwargs)
out, err = p.communicate()
except EnvironmentError as exc:
LOG.warn('command %s failed: %s', cmd, exc)
return
if p.returncode:
LOG.warn('command %s returned failure status %d:\n%s', cmd,
p.returncode, err.strip())
else:
return out.strip() if strip else out
def try_shell(sh, **kwargs):
strip = kwargs.pop('strip', True)
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
kwargs['shell'] = True
p = subprocess.Popen([sh], **kwargs)
out, err = p.communicate()
if p.returncode:
LOG.warn('shell script "%s" failed with code %d:\n%s', sh,
p.returncode, err.strip())
else:
return out.strip() if strip else out
class AccumulatedFailure(object):
"""Object accumulated failures without raising exception."""
def __init__(self):
self._failures = []
def add(self, fail, *fmt):
"""Add failure with optional formatting."""
if fmt:
fail = fail % fmt
LOG.error('%s', fail)
self._failures.append(fail)
def get_error(self):
"""Get error string or None."""
if not self._failures:
return
msg = ('The following errors were encountered during '
'hardware discovery:\n%s'
% '\n'.join('* %s' % item for item in self._failures))
return msg
def __nonzero__(self):
return bool(self._failures)
__bool__ = __nonzero__
def __repr__(self): # pragma: no cover
# This is for tests
if self:
return '<%s: %s>' % (self.__class__.__name__,
', '.join(self._failures))
else:
return '<%s: success>' % self.__class__.__name__
def discover_basic_properties(data, args):
# These properties might not be present, we don't count it as failure
data['boot_interface'] = args.bootif
data['ipmi_address'] = try_shell(
"ipmitool lan print | grep -e 'IP Address [^S]' | awk '{ print $4 }'")
LOG.info('BMC IP address: %s', data['ipmi_address'])
def discover_network_interfaces(data, failures):
data.setdefault('interfaces', {})
for iface in netifaces.interfaces():
if iface.startswith('lo'):
LOG.info('ignoring local network interface %s', iface)
continue
LOG.debug('found network interface %s', iface)
addrs = netifaces.ifaddresses(iface)
try:
mac = addrs[netifaces.AF_LINK][0]['addr']
except (KeyError, IndexError):
LOG.info('no link information for interface %s in %s',
iface, addrs)
continue
try:
ip = addrs[netifaces.AF_INET][0]['addr']
except (KeyError, IndexError):
LOG.info('no IP address for interface %s', iface)
ip = None
data['interfaces'][iface] = {'mac': mac, 'ip': ip}
if data['interfaces']:
LOG.info('network interfaces: %s', data['interfaces'])
else:
failures.add('no network interfaces found')
def discover_scheduling_properties(data, failures):
scripts = [
('cpus', "grep processor /proc/cpuinfo | wc -l"),
('cpu_arch', "lscpu | grep Architecture | awk '{ print $2 }'"),
('local_gb', "fdisk -l | grep Disk | awk '{print $5}' | head -n 1"),
]
for key, script in scripts:
data[key] = try_shell(script)
LOG.info('value for "%s" field is %s', key, data[key])
ram_info = try_shell(
"dmidecode --type memory | grep Size | awk '{ print $2; }'")
if ram_info:
total_ram = 0
for ram_record in ram_info.split('\n'):
try:
total_ram += int(ram_record)
except ValueError:
pass
data['memory_mb'] = total_ram
LOG.info('total RAM: %s MiB', total_ram)
else:
LOG.warn('failed to get RAM information')
for key in ('cpus', 'local_gb', 'memory_mb'):
try:
data[key] = int(data[key])
except (KeyError, ValueError, TypeError):
LOG.warn('value for %s is missing or malformed: %s',
key, data.get(key))
data[key] = None
# FIXME(dtantsur): -1 is required to give Ironic some spacing for
# partitioning and may be removed later
if data['local_gb']:
data['local_gb'] = data['local_gb'] / 1024 / 1024 / 1024 - 1
if data['local_gb'] < 1:
LOG.warn('local_gb is less than 1 GiB')
data['local_gb'] = None
def discover_additional_properties(args, data, failures):
hw_args = ('--benchmark', 'cpu', 'disk', 'mem') if args.benchmark else ()
hw_json = try_call('hardware-detect', *hw_args)
if hw_json:
try:
data['data'] = json.loads(hw_json)
except ValueError:
LOG.error('JSON value returned from hardware-detect cannot be '
'decoded:\n%s', hw_json)
failures.add('unable to get extended hardware properties')
else:
failures.add('unable to get extended hardware properties')
def discover_block_devices(data):
block_devices = try_shell(
"lsblk -no TYPE,SERIAL | grep disk | awk '{print $2}'")
if not block_devices:
LOG.warn('unable to get block devices')
return
serials = [item for item in block_devices.split('\n') if item.strip()]
data['block_devices'] = {'serials': serials}
def discover_hardware(args, data, failures):
try_call('modprobe', 'ipmi_msghandler')
try_call('modprobe', 'ipmi_devintf')
try_call('modprobe', 'ipmi_si')
discover_basic_properties(data, args)
discover_network_interfaces(data, failures)
discover_scheduling_properties(data, failures)
if args.use_hardware_detect:
discover_additional_properties(args, data, failures)
discover_block_devices(data)
def call_inspector(args, data, failures):
data['error'] = failures.get_error()
LOG.info('posting collected data to %s', args.callback_url)
resp = requests.post(args.callback_url, data=json.dumps(data))
if resp.status_code >= 400:
LOG.error('inspector error %d: %s',
resp.status_code,
resp.content.decode('utf-8'))
resp.raise_for_status()
return resp.json()
def collect_logs(args):
files = {args.log_file} | set(args.system_log_file or ())
with tempfile.TemporaryFile() as fp:
with tarfile.open(fileobj=fp, mode='w:gz') as tar:
with tempfile.NamedTemporaryFile() as jrnl_fp:
if try_shell("journalctl > '%s'" % jrnl_fp.name) is not None:
tar.add(jrnl_fp.name, arcname='journal')
else:
LOG.warn('failed to get system journal')
for fname in files:
if os.path.exists(fname):
tar.add(fname)
else:
LOG.warn('log file %s does not exist', fname)
fp.seek(0)
return base64.b64encode(fp.read())
def setup_ipmi_credentials(resp):
user, password = resp['ipmi_username'], resp['ipmi_password']
if try_call('ipmitool', 'user', 'set', 'name', '2', user) is None:
raise RuntimeError('failed to set IPMI user name to %s', user)
if try_call('ipmitool', 'user', 'set', 'password', '2', password) is None:
raise RuntimeError('failed to set IPMI password')
try_call('ipmitool', 'user', 'enable', '2')
try_call('ipmitool', 'channel', 'setaccess', '1', '2',
'link=on', 'ipmi=on', 'callin=on', 'privilege=4')

View File

@ -1,93 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import sys
import requests
from ironic_inspector_ramdisk import discover
LOG = logging.getLogger('ironic-inspector-ramdisk')
def parse_args(args):
parser = argparse.ArgumentParser(description='Detect present hardware.')
parser.add_argument('-L', '--system-log-file', action='append',
help='System log file to be sent to inspector, may be '
'specified multiple times')
parser.add_argument('-l', '--log-file', default='discovery-logs',
help='Path to log file, defaults to ./discovery-logs')
parser.add_argument('--bootif', help='PXE boot interface')
# Support for edeploy plugin
parser.add_argument('--use-hardware-detect', action='store_true',
help='Use hardware-detect utility from '
'python-hardware package')
parser.add_argument('--benchmark', action='store_true',
help='Enables benchmarking for hardware-detect')
# ironic-inspector callback
parser.add_argument('callback_url',
help='Full ironic-inspector callback URL')
return parser.parse_args(args)
def setup_logging(args):
format = '%(asctime)s %(levelname)s: %(name)s: %(message)s'
logging.basicConfig(filename=args.log_file, filemode='w',
level=logging.DEBUG, format=format)
hnd = logging.StreamHandler()
hnd.setLevel(logging.WARNING)
formatter = logging.Formatter('%(levelname)s: %(message)s')
hnd.setFormatter(formatter)
logging.getLogger().addHandler(hnd)
def main():
args = parse_args(sys.argv[1:])
data = {}
setup_logging(args)
failures = discover.AccumulatedFailure()
try:
discover.discover_hardware(args, data, failures)
except Exception as exc:
LOG.exception('failed to discover data')
failures.add(exc)
try:
data['logs'] = discover.collect_logs(args)
except Exception:
LOG.exception('failed to collect logs')
call_error = True
resp = {}
try:
resp = discover.call_inspector(args, data, failures)
except requests.RequestException as exc:
LOG.error('%s when calling to inspector', exc)
except Exception:
LOG.exception('failed to call inspector')
else:
call_error = False
if resp.get('ipmi_setup_credentials'):
try:
discover.setup_ipmi_credentials(resp)
except Exception:
LOG.exception('failed to set IPMI credentials')
call_error = True
if failures or call_error:
sys.exit(1)

View File

@ -1,412 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import collections
import io
import os
import shutil
import subprocess
import tarfile
import tempfile
import unittest
try:
# mock library is buggy under Python 3.4, but we have a stdlib one
from unittest import mock
except ImportError:
import mock
import netifaces
import requests
from ironic_inspector_ramdisk import discover
def get_fake_args():
return mock.Mock(callback_url='url', daemonize_on_failure=True,
benchmark=None)
FAKE_ARGS = get_fake_args()
class TestCommands(unittest.TestCase):
@mock.patch.object(discover.LOG, 'warn', autospec=True)
@mock.patch.object(subprocess, 'Popen', autospec=True)
def test_try_call(self, mock_popen, mock_warn):
mock_popen.return_value.communicate.return_value = ('out', 'err')
mock_popen.return_value.returncode = 0
discover.try_call('ls', '-l')
mock_popen.assert_called_once_with(('ls', '-l'),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
self.assertFalse(mock_warn.called)
@mock.patch.object(discover.LOG, 'warn', autospec=True)
@mock.patch.object(subprocess, 'Popen', autospec=True)
def test_try_call_fails(self, mock_popen, mock_warn):
mock_popen.return_value.communicate.return_value = ('out', 'err')
mock_popen.return_value.returncode = 42
discover.try_call('ls', '-l')
mock_popen.assert_called_once_with(('ls', '-l'),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
mock_warn.assert_called_once_with(mock.ANY, ('ls', '-l'), 42, 'err')
@mock.patch.object(discover.LOG, 'warn', autospec=True)
def test_try_call_os_error(self, mock_warn):
discover.try_call('I don\'t exist!', '-l')
mock_warn.assert_called_once_with(mock.ANY, ('I don\'t exist!', '-l'),
mock.ANY)
@mock.patch.object(discover.LOG, 'warn', autospec=True)
def test_try_shell(self, mock_warn):
res = discover.try_shell('echo Hello; echo World')
self.assertEqual(b'Hello\nWorld', res)
self.assertFalse(mock_warn.called)
@mock.patch.object(discover.LOG, 'warn', autospec=True)
def test_try_shell_fails(self, mock_warn):
res = discover.try_shell('exit 1')
self.assertIsNone(res)
self.assertTrue(mock_warn.called)
@mock.patch.object(discover.LOG, 'warn', autospec=True)
def test_try_shell_no_strip(self, mock_warn):
res = discover.try_shell('echo Hello; echo World',
strip=False)
self.assertEqual(b'Hello\nWorld\n', res)
self.assertFalse(mock_warn.called)
class TestFailures(unittest.TestCase):
def test(self):
f = discover.AccumulatedFailure()
self.assertFalse(f)
self.assertIsNone(f.get_error())
f.add('foo')
f.add('%s', 'bar')
f.add(RuntimeError('baz'))
exp = ('The following errors were encountered during '
'hardware discovery:\n* foo\n* bar\n* baz')
self.assertEqual(exp, f.get_error())
self.assertTrue(f)
class BaseDiscoverTest(unittest.TestCase):
def setUp(self):
super(BaseDiscoverTest, self).setUp()
self.failures = discover.AccumulatedFailure()
self.data = {}
@mock.patch.object(discover, 'try_shell', autospec=True)
class TestDiscoverBasicProperties(BaseDiscoverTest):
def test(self, mock_shell):
mock_shell.return_value = '1.2.3.4'
discover.discover_basic_properties(
self.data, mock.Mock(bootif='boot:if'))
self.assertEqual({'ipmi_address': '1.2.3.4',
'boot_interface': 'boot:if'},
self.data)
@mock.patch.object(netifaces, 'ifaddresses', autospec=True)
@mock.patch.object(netifaces, 'interfaces', autospec=True)
class TestDiscoverNetworkInterfaces(BaseDiscoverTest):
def _call(self):
discover.discover_network_interfaces(self.data, self.failures)
def test_nothing(self, mock_ifaces, mock_ifaddr):
mock_ifaces.return_value = ['lo']
self._call()
mock_ifaces.assert_called_once_with()
self.assertFalse(mock_ifaddr.called)
self.assertIn('no network interfaces', self.failures.get_error())
self.assertEqual({'interfaces': {}}, self.data)
def test_ok(self, mock_ifaces, mock_ifaddr):
interfaces = [
{
netifaces.AF_LINK: [{'addr': '11:22:33:44:55:66'}],
netifaces.AF_INET: [{'addr': '1.2.3.4'}],
},
{
netifaces.AF_LINK: [{'addr': '11:22:33:44:55:44'}],
netifaces.AF_INET: [{'addr': '1.2.3.2'}],
},
]
mock_ifaces.return_value = ['lo', 'em1', 'em2']
mock_ifaddr.side_effect = iter(interfaces)
self._call()
mock_ifaddr.assert_any_call('em1')
mock_ifaddr.assert_any_call('em2')
self.assertEqual(2, mock_ifaddr.call_count)
self.assertEqual({'em1': {'mac': '11:22:33:44:55:66',
'ip': '1.2.3.4'},
'em2': {'mac': '11:22:33:44:55:44',
'ip': '1.2.3.2'}},
self.data['interfaces'])
self.assertFalse(self.failures)
def test_missing(self, mock_ifaces, mock_ifaddr):
interfaces = [
{
netifaces.AF_INET: [{'addr': '1.2.3.4'}],
},
{
netifaces.AF_LINK: [],
netifaces.AF_INET: [{'addr': '1.2.3.4'}],
},
{
netifaces.AF_LINK: [{'addr': '11:22:33:44:55:66'}],
netifaces.AF_INET: [],
},
{
netifaces.AF_LINK: [{'addr': '11:22:33:44:55:44'}],
},
]
mock_ifaces.return_value = ['lo', 'br0', 'br1', 'em1', 'em2']
mock_ifaddr.side_effect = iter(interfaces)
self._call()
self.assertEqual(4, mock_ifaddr.call_count)
self.assertEqual({'em1': {'mac': '11:22:33:44:55:66', 'ip': None},
'em2': {'mac': '11:22:33:44:55:44', 'ip': None}},
self.data['interfaces'])
self.assertFalse(self.failures)
@mock.patch.object(discover, 'try_shell', autospec=True)
class TestDiscoverSchedulingProperties(BaseDiscoverTest):
def test_ok(self, mock_shell):
mock_shell.side_effect = iter(('2', 'x86_64', '5368709120',
'1024\n1024\nno\n2048\n'))
discover.discover_scheduling_properties(self.data, self.failures)
self.assertFalse(self.failures)
self.assertEqual({'cpus': 2, 'cpu_arch': 'x86_64', 'local_gb': 4,
'memory_mb': 4096}, self.data)
def test_no_ram(self, mock_shell):
mock_shell.side_effect = iter(('2', 'x86_64', '5368709120', None))
discover.discover_scheduling_properties(self.data, self.failures)
self.assertFalse(self.failures)
self.assertEqual({'cpus': 2, 'cpu_arch': 'x86_64', 'local_gb': 4,
'memory_mb': None}, self.data)
def test_no_local_gb(self, mock_shell):
mock_shell.side_effect = iter(('2', 'x86_64', None,
'1024\n1024\nno\n2048\n'))
discover.discover_scheduling_properties(self.data, self.failures)
self.assertFalse(self.failures)
self.assertEqual({'cpus': 2, 'cpu_arch': 'x86_64', 'local_gb': None,
'memory_mb': 4096}, self.data)
def test_local_gb_too_small(self, mock_shell):
mock_shell.side_effect = iter(('2', 'x86_64', '42',
'1024\n1024\nno\n2048\n'))
discover.discover_scheduling_properties(self.data, self.failures)
self.assertFalse(self.failures)
self.assertEqual({'cpus': 2, 'cpu_arch': 'x86_64', 'local_gb': None,
'memory_mb': 4096}, self.data)
@mock.patch.object(discover, 'try_call')
class TestDiscoverAdditionalProperties(BaseDiscoverTest):
def test_ok(self, mock_call):
mock_call.return_value = '["prop1", "prop2"]'
discover.discover_additional_properties(
FAKE_ARGS, self.data, self.failures)
self.assertFalse(self.failures)
mock_call.assert_called_once_with('hardware-detect')
self.assertEqual(['prop1', 'prop2'], self.data['data'])
def test_failure(self, mock_call):
mock_call.return_value = None
discover.discover_additional_properties(
FAKE_ARGS, self.data, self.failures)
self.assertIn('unable to get extended hardware properties',
self.failures.get_error())
self.assertNotIn('data', self.data)
def test_not_json(self, mock_call):
mock_call.return_value = 'foo?'
discover.discover_additional_properties(
FAKE_ARGS, self.data, self.failures)
self.assertIn('unable to get extended hardware properties',
self.failures.get_error())
self.assertNotIn('data', self.data)
@mock.patch.object(discover, 'try_shell')
class TestDiscoverBlockDevices(BaseDiscoverTest):
def test_ok(self, mock_shell):
mock_shell.return_value = 'QM00005\nQM00006'
discover.discover_block_devices(self.data)
self.assertEqual({'serials': ['QM00005', 'QM00006']},
self.data['block_devices'])
def test_failure(self, mock_shell):
mock_shell.return_value = None
discover.discover_block_devices(self.data)
self.assertNotIn('block_devices', self.data)
@mock.patch.object(requests, 'post', autospec=True)
class TestCallDiscoverd(unittest.TestCase):
def test_ok(self, mock_post):
failures = discover.AccumulatedFailure()
data = collections.OrderedDict(data=42)
mock_post.return_value.status_code = 200
discover.call_inspector(FAKE_ARGS, data, failures)
mock_post.assert_called_once_with('url',
data='{"data": 42, "error": null}')
def test_send_failure(self, mock_post):
failures = mock.Mock(spec=discover.AccumulatedFailure)
failures.get_error.return_value = "boom"
data = collections.OrderedDict(data=42)
mock_post.return_value.status_code = 200
discover.call_inspector(FAKE_ARGS, data, failures)
mock_post.assert_called_once_with('url',
data='{"data": 42, "error": "boom"}')
def test_inspector_error(self, mock_post):
failures = discover.AccumulatedFailure()
data = collections.OrderedDict(data=42)
mock_post.return_value.status_code = 400
discover.call_inspector(FAKE_ARGS, data, failures)
mock_post.assert_called_once_with('url',
data='{"data": 42, "error": null}')
mock_post.return_value.raise_for_status.assert_called_once_with()
@mock.patch.object(discover, 'try_shell')
class TestCollectLogs(unittest.TestCase):
def _fake_journal_write(self, shell):
file_name = shell.rsplit(' ', 1)[1].strip("'")
with open(file_name, 'wb') as fp:
fp.write(b'journal contents')
return ""
def setUp(self):
super(TestCollectLogs, self).setUp()
temp_dir = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(temp_dir))
self.files = [os.path.join(temp_dir, fname)
for fname in ('main', 'log_1', 'log_2')]
for fname in self.files[:2]:
with open(fname, 'wb') as fp:
fp.write(fname.encode())
self.fake_args = get_fake_args()
self.fake_args.log_file = self.files[0]
self.fake_args.system_log_file = self.files[1:]
def test(self, mock_shell):
mock_shell.side_effect = self._fake_journal_write
res = discover.collect_logs(self.fake_args)
res = io.BytesIO(base64.b64decode(res))
with tarfile.open(fileobj=res) as tar:
members = list(sorted((m.name, m.size) for m in tar))
self.assertEqual(
[('journal', 16)] +
list(sorted((name[1:], len(name)) for name in self.files[:2])),
members)
def test_no_journal(self, mock_shell):
mock_shell.return_value = None
res = discover.collect_logs(self.fake_args)
res = io.BytesIO(base64.b64decode(res))
with tarfile.open(fileobj=res) as tar:
members = list(sorted((m.name, m.size) for m in tar))
self.assertEqual(
list(sorted((name[1:], len(name)) for name in self.files[:2])),
members)
@mock.patch.object(discover, 'try_call', autospec=True)
class TestSetupIpmiCredentials(unittest.TestCase):
def setUp(self):
super(TestSetupIpmiCredentials, self).setUp()
self.resp = {'ipmi_username': 'user',
'ipmi_password': 'pwd'}
def test_ok(self, mock_call):
mock_call.return_value = ""
discover.setup_ipmi_credentials(self.resp)
mock_call.assert_any_call('ipmitool', 'user', 'set', 'name',
'2', 'user')
mock_call.assert_any_call('ipmitool', 'user', 'set', 'password',
'2', 'pwd')
mock_call.assert_any_call('ipmitool', 'user', 'enable', '2')
mock_call.assert_any_call('ipmitool', 'channel', 'setaccess', '1', '2',
'link=on', 'ipmi=on', 'callin=on',
'privilege=4')
def test_user_failed(self, mock_call):
mock_call.return_value = None
self.assertRaises(RuntimeError, discover.setup_ipmi_credentials,
self.resp)
mock_call.assert_called_once_with('ipmitool', 'user', 'set', 'name',
'2', 'user')
def test_password_failed(self, mock_call):
mock_call.side_effect = iter(("", None))
self.assertRaises(RuntimeError, discover.setup_ipmi_credentials,
self.resp)
mock_call.assert_any_call('ipmitool', 'user', 'set', 'name',
'2', 'user')
mock_call.assert_any_call('ipmitool', 'user', 'set', 'password',
'2', 'pwd')

View File

@ -1,136 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import mock
import requests
from ironic_inspector_ramdisk import discover
from ironic_inspector_ramdisk import main
from ironic_inspector_ramdisk.test import test_discover
FAKE_ARGS = test_discover.get_fake_args()
class TestParseArgs(unittest.TestCase):
def test(self):
args = ['http://url']
parsed_args = main.parse_args(args)
self.assertEqual('http://url', parsed_args.callback_url)
def test_log_files(self):
args = ['-L', 'log1', '-L', 'log2', 'url']
parsed_args = main.parse_args(args)
self.assertEqual(['log1', 'log2'],
parsed_args.system_log_file)
@mock.patch.object(main, 'setup_logging', lambda args: None)
@mock.patch.object(main, 'parse_args', return_value=FAKE_ARGS,
autospec=True)
@mock.patch.object(discover, 'setup_ipmi_credentials', autospec=True)
@mock.patch.object(discover, 'call_inspector', autospec=True,
return_value={})
@mock.patch.object(discover, 'collect_logs', autospec=True)
@mock.patch.object(discover, 'discover_hardware', autospec=True)
class TestMain(unittest.TestCase):
def test_success(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_parse):
mock_logs.return_value = 'LOG'
main.main()
# FIXME(dtantsur): mock does not copy arguments, so the 2nd argument
# actually is not what we expect ({})
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
self.assertFalse(mock_setup_ipmi.called)
def test_discover_fails(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_parse):
mock_logs.return_value = 'LOG'
mock_discover.side_effect = RuntimeError('boom')
self.assertRaisesRegexp(SystemExit, '1', main.main)
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
failures = mock_callback.call_args[0][2]
self.assertIn('boom', failures.get_error())
def test_collect_logs_fails(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_parse):
mock_logs.side_effect = RuntimeError('boom')
main.main()
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {}, mock.ANY)
def test_callback_fails(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_parse):
mock_logs.return_value = 'LOG'
mock_callback.side_effect = requests.HTTPError('boom')
self.assertRaisesRegexp(SystemExit, '1', main.main)
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
def test_callback_fails2(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_parse):
mock_logs.return_value = 'LOG'
mock_callback.side_effect = RuntimeError('boom')
self.assertRaisesRegexp(SystemExit, '1', main.main)
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
def test_setup_ipmi(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_parse):
mock_logs.return_value = 'LOG'
mock_callback.return_value = {'ipmi_setup_credentials': True}
main.main()
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
mock_setup_ipmi.assert_called_once_with(mock_callback.return_value)
def test_setup_ipmi_fails(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_parse):
mock_logs.return_value = 'LOG'
mock_callback.return_value = {'ipmi_setup_credentials': True}
mock_setup_ipmi.side_effect = RuntimeError('boom')
self.assertRaisesRegexp(SystemExit, '1', main.main)
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
mock_setup_ipmi.assert_called_once_with(mock_callback.return_value)

View File

@ -6,12 +6,10 @@ cliff>=1.13.0 # Apache-2.0
eventlet>=0.17.4
Flask<1.0,>=0.10
keystonemiddleware>=1.5.0
netifaces>=0.10.4
pbr<2.0,>=0.11
python-ironicclient>=0.2.1
python-keystoneclient>=1.6.0
python-openstackclient>=1.0.3
requests>=2.5.2
oslo.config>=1.11.0 # Apache-2.0
oslo.i18n>=1.5.0 # Apache-2.0
oslo.utils>=1.6.0 # Apache-2.0

View File

@ -18,9 +18,6 @@ classifier =
[files]
packages =
ironic_inspector
ironic_inspector_ramdisk
scripts =
bin/ironic-inspector-ramdisk
[entry_points]
console_scripts =

View File

@ -9,7 +9,6 @@ deps =
-r{toxinidir}/plugin-requirements.txt
commands =
coverage run --branch --include "ironic_inspector*" -m unittest discover ironic_inspector.test
coverage run --branch --include "ironic_inspector_ramdisk*" -a -m unittest discover ironic_inspector_ramdisk.test
coverage report -m --fail-under 90
setenv = PYTHONDONTWRITEBYTECODE=1
passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY
@ -24,7 +23,7 @@ deps =
-r{toxinidir}/test-requirements.txt
-r{toxinidir}/plugin-requirements.txt
commands =
flake8 ironic_inspector ironic_inspector_ramdisk
flake8 ironic_inspector
doc8 README.rst CONTRIBUTING.rst HTTP-API.rst
[flake8]