Initial import of ironic_discoverd_ramdisk

This is Python script for implementing a full-featured ironic-discoverd
ramdisk, including support for 'edeploy' plugin. It was initially
developed downstream.

Missing in this patch are:
* support for updating IPMI credentials
* support for serving logs via HTTP

Change-Id: I6987d46d7b6a1411d0d38f97373e5cd036ba6162
Implements: bluepring python-ramdisk-code
This commit is contained in:
Dmitry Tantsur 2015-04-24 15:13:18 +02:00
parent c589e8ed6c
commit 48471bce6d
10 changed files with 857 additions and 2 deletions

View File

@ -0,0 +1,4 @@
#!/usr/bin/env python
from ironic_discoverd_ramdisk import main
main.main()

View File

View File

@ -0,0 +1,248 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import logging
import os
import subprocess
import tarfile
import tempfile
import netifaces
import requests
LOG = logging.getLogger('ironic-discoverd-ramdisk')
def try_call(*cmd, **kwargs):
strip = kwargs.pop('strip', True)
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
try:
p = subprocess.Popen(cmd, **kwargs)
out, err = p.communicate()
except EnvironmentError as exc:
LOG.warn('command %s failed: %s', cmd, exc)
return
if p.returncode:
LOG.warn('command %s returned failure status %d:\n%s', cmd,
p.returncode, err.strip())
else:
return out.strip() if strip else out
def try_shell(sh, **kwargs):
strip = kwargs.pop('strip', True)
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.PIPE
kwargs['shell'] = True
p = subprocess.Popen([sh], **kwargs)
out, err = p.communicate()
if p.returncode:
LOG.warn('shell script "%s" failed with code %d:\n%s', sh,
p.returncode, err.strip())
else:
return out.strip() if strip else out
class AccumulatedFailure(object):
"""Object accumulated failures without raising exception."""
def __init__(self):
self._failures = []
def add(self, fail, *fmt):
"""Add failure with optional formatting."""
if fmt:
fail = fail % fmt
LOG.error('%s', fail)
self._failures.append(fail)
def get_error(self):
"""Get error string or None."""
if not self._failures:
return
msg = ('The following errors were encountered during '
'hardware discovery:\n%s'
% '\n'.join('* %s' % item for item in self._failures))
return msg
def __nonzero__(self):
return bool(self._failures)
__bool__ = __nonzero__
def __repr__(self): # pragma: no cover
# This is for tests
if self:
return '<%s: %s>' % (self.__class__.__name__,
', '.join(self._failures))
else:
return '<%s: success>' % self.__class__.__name__
def discover_basic_properties(data, args):
# These properties might not be present, we don't count it as failure
data['boot_interface'] = args.bootif
data['ipmi_address'] = try_shell(
"ipmitool lan print | grep -e 'IP Address [^S]' | awk '{ print $4 }'")
LOG.info('BMC IP address: %s', data['ipmi_address'])
def discover_network_interfaces(data, failures):
data.setdefault('interfaces', {})
for iface in netifaces.interfaces():
if iface.startswith('lo'):
LOG.info('ignoring local network interface %s', iface)
continue
LOG.debug('found network interface %s', iface)
addrs = netifaces.ifaddresses(iface)
try:
mac = addrs[netifaces.AF_LINK][0]['addr']
except (KeyError, IndexError):
LOG.info('no link information for interface %s in %s',
iface, addrs)
continue
try:
ip = addrs[netifaces.AF_INET][0]['addr']
except (KeyError, IndexError):
LOG.info('no IP address for interface %s', iface)
ip = None
data['interfaces'][iface] = {'mac': mac, 'ip': ip}
if data['interfaces']:
LOG.info('network interfaces: %s', data['interfaces'])
else:
failures.add('no network interfaces found')
def discover_scheduling_properties(data, failures):
scripts = [
('cpus', "grep processor /proc/cpuinfo | wc -l"),
('cpu_arch', "lscpu | grep Architecture | awk '{ print $2 }'"),
('local_gb', "fdisk -l | grep Disk | awk '{print $5}' | head -n 1"),
]
for key, script in scripts:
data[key] = try_shell(script)
LOG.info('value for "%s" field is %s', key, data[key])
ram_info = try_shell(
"dmidecode --type memory | grep Size | awk '{ print $2; }'")
if ram_info:
total_ram = 0
for ram_record in ram_info.split('\n'):
try:
total_ram += int(ram_record)
except ValueError:
pass
data['memory_mb'] = total_ram
LOG.info('total RAM: %s MiB', total_ram)
else:
failures.add('failed to get RAM information')
for key in ('cpus', 'local_gb', 'memory_mb'):
try:
data[key] = int(data[key])
except (KeyError, ValueError, TypeError):
failures.add('value for %s is missing or malformed: %s',
key, data.get(key))
data[key] = None
# FIXME(dtantsur): -1 is required to give Ironic some spacing for
# partitioning and may be removed later
if data['local_gb']:
data['local_gb'] = data['local_gb'] / 1024 / 1024 / 1024 - 1
if data['local_gb'] < 1:
failures.add('local_gb is less than 1 GiB')
data['local_gb'] = None
def discover_additional_properties(args, data, failures):
hw_args = ('--benchmark', 'cpu', 'disk', 'mem') if args.benchmark else ()
hw_json = try_call('hardware-detect', *hw_args)
if hw_json:
try:
data['data'] = json.loads(hw_json)
except ValueError:
LOG.error('JSON value returned from hardware-detect cannot be '
'decoded:\n%s', hw_json)
failures.add('unable to get extended hardware properties')
else:
failures.add('unable to get extended hardware properties')
def discover_block_devices(data):
block_devices = try_shell(
"lsblk -no TYPE,SERIAL | grep disk | awk '{print $2}'")
if not block_devices:
LOG.warn('unable to get block devices')
return
serials = [item for item in block_devices.split('\n') if item.strip()]
data['block_devices'] = {'serials': serials}
def discover_hardware(args, data, failures):
try_call('modprobe', 'ipmi_msghandler')
try_call('modprobe', 'ipmi_devintf')
try_call('modprobe', 'ipmi_si')
discover_basic_properties(data, args)
discover_network_interfaces(data, failures)
discover_scheduling_properties(data, failures)
if args.use_hardware_detect:
discover_additional_properties(args, data, failures)
discover_block_devices(data)
def call_discoverd(args, data, failures):
data['error'] = failures.get_error()
LOG.info('posting collected data to %s', args.callback_url)
resp = requests.post(args.callback_url, data=json.dumps(data))
if resp.status_code >= 400:
LOG.error('discoverd error %d: %s',
resp.status_code,
resp.content.decode('utf-8'))
resp.raise_for_status()
return resp.json()
def collect_logs(args):
files = {args.log_file} | set(args.system_log_file or ())
with tempfile.TemporaryFile() as fp:
with tarfile.open(fileobj=fp, mode='w:gz') as tar:
for fname in files:
if os.path.exists(fname):
tar.add(fname)
else:
LOG.warn('Log file %s does not exist', fname)
fp.seek(0)
return base64.b64encode(fp.read())
def setup_ipmi_credentials(resp):
pass # TODO(dtantsur): implement
def fork_and_serve_logs(args):
pass # TODO(dtantsur): implement

View File

@ -0,0 +1,100 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import sys
import requests
from ironic_discoverd_ramdisk import discover
LOG = logging.getLogger('ironic-discoverd-ramdisk')
def parse_args(args):
parser = argparse.ArgumentParser(description='Detect present hardware.')
parser.add_argument('-p', '--port', type=int, default=8080,
help='Port to serve logs over HTTP')
parser.add_argument('-L', '--system-log-file', action='append',
help='System log file to be sent to discoverd, may be '
'specified multiple times')
parser.add_argument('-l', '--log-file', default='discovery-logs',
help='Path to log file, defaults to ./discovery-logs')
parser.add_argument('-d', '--daemonize-on-failure', action='store_true',
help='In case of failure, fork off, continue running '
'and serve logs via port set by --port')
parser.add_argument('--bootif', help='PXE boot interface')
# Support for edeploy plugin
parser.add_argument('--use-hardware-detect', action='store_true',
help='Use hardware-detect utility from '
'python-hardware package')
parser.add_argument('--benchmark', action='store_true',
help='Enables benchmarking for hardware-detect')
# ironic-discoverd callback
parser.add_argument('callback_url',
help='Full ironic-discoverd callback URL')
return parser.parse_args(args)
def setup_logging(args):
format = '%(asctime)s %(levelname)s: %(name)s: %(message)s'
logging.basicConfig(filename=args.log_file, filemode='w',
level=logging.DEBUG, format=format)
hnd = logging.StreamHandler()
hnd.setLevel(logging.WARNING)
formatter = logging.Formatter('%(levelname)s: %(message)s')
hnd.setFormatter(formatter)
logging.getLogger().addHandler(hnd)
def main():
args = parse_args(sys.argv[1:])
data = {}
setup_logging(args)
failures = discover.AccumulatedFailure()
try:
discover.discover_hardware(args, data, failures)
except Exception as exc:
LOG.exception('failed to discover data')
failures.add(exc)
try:
data['logs'] = discover.collect_logs(args)
except Exception:
LOG.exception('failed to collect logs')
call_error = True
resp = {}
try:
resp = discover.call_discoverd(args, data, failures)
except requests.RequestException as exc:
LOG.error('%s when calling to discoverd', exc)
except Exception:
LOG.exception('failed to call discoverd')
else:
call_error = False
if resp.get('ipmi_setup_credentials'):
try:
discover.setup_ipmi_credentials(resp)
except Exception:
LOG.exception('failed to set IPMI credentials')
call_error = True
if failures or call_error:
if args.daemonize_on_failure:
discover.fork_and_serve_logs(args)
sys.exit(1)

View File

@ -0,0 +1,336 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import collections
import io
import os
import shutil
import subprocess
import tarfile
import tempfile
import unittest
try:
# mock library is buggy under Python 3.4, but we have a stdlib one
from unittest import mock
except ImportError:
import mock
import netifaces
import requests
from ironic_discoverd_ramdisk import discover
def get_fake_args():
return mock.Mock(callback_url='url', daemonize_on_failure=True,
benchmark=None)
FAKE_ARGS = get_fake_args()
class TestCommands(unittest.TestCase):
@mock.patch.object(discover.LOG, 'warn', autospec=True)
@mock.patch.object(subprocess, 'Popen', autospec=True)
def test_try_call(self, mock_popen, mock_warn):
mock_popen.return_value.communicate.return_value = ('out', 'err')
mock_popen.return_value.returncode = 0
discover.try_call('ls', '-l')
mock_popen.assert_called_once_with(('ls', '-l'),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
self.assertFalse(mock_warn.called)
@mock.patch.object(discover.LOG, 'warn', autospec=True)
@mock.patch.object(subprocess, 'Popen', autospec=True)
def test_try_call_fails(self, mock_popen, mock_warn):
mock_popen.return_value.communicate.return_value = ('out', 'err')
mock_popen.return_value.returncode = 42
discover.try_call('ls', '-l')
mock_popen.assert_called_once_with(('ls', '-l'),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
mock_warn.assert_called_once_with(mock.ANY, ('ls', '-l'), 42, 'err')
@mock.patch.object(discover.LOG, 'warn', autospec=True)
def test_try_call_os_error(self, mock_warn):
discover.try_call('I don\'t exist!', '-l')
mock_warn.assert_called_once_with(mock.ANY, ('I don\'t exist!', '-l'),
mock.ANY)
@mock.patch.object(discover.LOG, 'warn', autospec=True)
def test_try_shell(self, mock_warn):
res = discover.try_shell('echo Hello; echo World')
self.assertEqual(b'Hello\nWorld', res)
self.assertFalse(mock_warn.called)
@mock.patch.object(discover.LOG, 'warn', autospec=True)
def test_try_shell_fails(self, mock_warn):
res = discover.try_shell('exit 1')
self.assertIsNone(res)
self.assertTrue(mock_warn.called)
@mock.patch.object(discover.LOG, 'warn', autospec=True)
def test_try_shell_no_strip(self, mock_warn):
res = discover.try_shell('echo Hello; echo World',
strip=False)
self.assertEqual(b'Hello\nWorld\n', res)
self.assertFalse(mock_warn.called)
class TestFailures(unittest.TestCase):
def test(self):
f = discover.AccumulatedFailure()
self.assertFalse(f)
self.assertIsNone(f.get_error())
f.add('foo')
f.add('%s', 'bar')
f.add(RuntimeError('baz'))
exp = ('The following errors were encountered during '
'hardware discovery:\n* foo\n* bar\n* baz')
self.assertEqual(exp, f.get_error())
self.assertTrue(f)
class BaseDiscoverTest(unittest.TestCase):
def setUp(self):
super(BaseDiscoverTest, self).setUp()
self.failures = discover.AccumulatedFailure()
self.data = {}
@mock.patch.object(discover, 'try_shell', autospec=True)
class TestDiscoverBasicProperties(BaseDiscoverTest):
def test(self, mock_shell):
mock_shell.return_value = '1.2.3.4'
discover.discover_basic_properties(
self.data, mock.Mock(bootif='boot:if'))
self.assertEqual({'ipmi_address': '1.2.3.4',
'boot_interface': 'boot:if'},
self.data)
@mock.patch.object(netifaces, 'ifaddresses', autospec=True)
@mock.patch.object(netifaces, 'interfaces', autospec=True)
class TestDiscoverNetworkInterfaces(BaseDiscoverTest):
def _call(self):
discover.discover_network_interfaces(self.data, self.failures)
def test_nothing(self, mock_ifaces, mock_ifaddr):
mock_ifaces.return_value = ['lo']
self._call()
mock_ifaces.assert_called_once_with()
self.assertFalse(mock_ifaddr.called)
self.assertIn('no network interfaces', self.failures.get_error())
self.assertEqual({'interfaces': {}}, self.data)
def test_ok(self, mock_ifaces, mock_ifaddr):
interfaces = [
{
netifaces.AF_LINK: [{'addr': '11:22:33:44:55:66'}],
netifaces.AF_INET: [{'addr': '1.2.3.4'}],
},
{
netifaces.AF_LINK: [{'addr': '11:22:33:44:55:44'}],
netifaces.AF_INET: [{'addr': '1.2.3.2'}],
},
]
mock_ifaces.return_value = ['lo', 'em1', 'em2']
mock_ifaddr.side_effect = iter(interfaces)
self._call()
mock_ifaddr.assert_any_call('em1')
mock_ifaddr.assert_any_call('em2')
self.assertEqual(2, mock_ifaddr.call_count)
self.assertEqual({'em1': {'mac': '11:22:33:44:55:66',
'ip': '1.2.3.4'},
'em2': {'mac': '11:22:33:44:55:44',
'ip': '1.2.3.2'}},
self.data['interfaces'])
self.assertFalse(self.failures)
def test_missing(self, mock_ifaces, mock_ifaddr):
interfaces = [
{
netifaces.AF_INET: [{'addr': '1.2.3.4'}],
},
{
netifaces.AF_LINK: [],
netifaces.AF_INET: [{'addr': '1.2.3.4'}],
},
{
netifaces.AF_LINK: [{'addr': '11:22:33:44:55:66'}],
netifaces.AF_INET: [],
},
{
netifaces.AF_LINK: [{'addr': '11:22:33:44:55:44'}],
},
]
mock_ifaces.return_value = ['lo', 'br0', 'br1', 'em1', 'em2']
mock_ifaddr.side_effect = iter(interfaces)
self._call()
self.assertEqual(4, mock_ifaddr.call_count)
self.assertEqual({'em1': {'mac': '11:22:33:44:55:66', 'ip': None},
'em2': {'mac': '11:22:33:44:55:44', 'ip': None}},
self.data['interfaces'])
self.assertFalse(self.failures)
@mock.patch.object(discover, 'try_shell', autospec=True)
class TestDiscoverSchedulingProperties(BaseDiscoverTest):
def test_ok(self, mock_shell):
mock_shell.side_effect = iter(('2', 'x86_64', '5368709120',
'1024\n1024\nno\n2048\n'))
discover.discover_scheduling_properties(self.data, self.failures)
self.assertFalse(self.failures)
self.assertEqual({'cpus': 2, 'cpu_arch': 'x86_64', 'local_gb': 4,
'memory_mb': 4096}, self.data)
def test_no_ram(self, mock_shell):
mock_shell.side_effect = iter(('2', 'x86_64', '5368709120', None))
discover.discover_scheduling_properties(self.data, self.failures)
self.assertIn('failed to get RAM', self.failures.get_error())
self.assertEqual({'cpus': 2, 'cpu_arch': 'x86_64', 'local_gb': 4,
'memory_mb': None}, self.data)
def test_local_gb_too_small(self, mock_shell):
mock_shell.side_effect = iter(('2', 'x86_64', '42',
'1024\n1024\nno\n2048\n'))
discover.discover_scheduling_properties(self.data, self.failures)
self.assertIn('local_gb is less than 1 GiB', self.failures.get_error())
self.assertEqual({'cpus': 2, 'cpu_arch': 'x86_64', 'local_gb': None,
'memory_mb': 4096}, self.data)
@mock.patch.object(discover, 'try_call')
class TestDiscoverAdditionalProperties(BaseDiscoverTest):
def test_ok(self, mock_call):
mock_call.return_value = '["prop1", "prop2"]'
discover.discover_additional_properties(
FAKE_ARGS, self.data, self.failures)
self.assertFalse(self.failures)
mock_call.assert_called_once_with('hardware-detect')
self.assertEqual(['prop1', 'prop2'], self.data['data'])
def test_failure(self, mock_call):
mock_call.return_value = None
discover.discover_additional_properties(
FAKE_ARGS, self.data, self.failures)
self.assertIn('unable to get extended hardware properties',
self.failures.get_error())
self.assertNotIn('data', self.data)
def test_not_json(self, mock_call):
mock_call.return_value = 'foo?'
discover.discover_additional_properties(
FAKE_ARGS, self.data, self.failures)
self.assertIn('unable to get extended hardware properties',
self.failures.get_error())
self.assertNotIn('data', self.data)
@mock.patch.object(discover, 'try_shell')
class TestDiscoverBlockDevices(BaseDiscoverTest):
def test_ok(self, mock_shell):
mock_shell.return_value = 'QM00005\nQM00006'
discover.discover_block_devices(self.data)
self.assertEqual({'serials': ['QM00005', 'QM00006']},
self.data['block_devices'])
def test_failure(self, mock_shell):
mock_shell.return_value = None
discover.discover_block_devices(self.data)
self.assertNotIn('block_devices', self.data)
@mock.patch.object(requests, 'post', autospec=True)
class TestCallDiscoverd(unittest.TestCase):
def test_ok(self, mock_post):
failures = discover.AccumulatedFailure()
data = collections.OrderedDict(data=42)
mock_post.return_value.status_code = 200
discover.call_discoverd(FAKE_ARGS, data, failures)
mock_post.assert_called_once_with('url',
data='{"data": 42, "error": null}')
def test_send_failure(self, mock_post):
failures = mock.Mock(spec=discover.AccumulatedFailure)
failures.get_error.return_value = "boom"
data = collections.OrderedDict(data=42)
mock_post.return_value.status_code = 200
discover.call_discoverd(FAKE_ARGS, data, failures)
mock_post.assert_called_once_with('url',
data='{"data": 42, "error": "boom"}')
def test_discoverd_error(self, mock_post):
failures = discover.AccumulatedFailure()
data = collections.OrderedDict(data=42)
mock_post.return_value.status_code = 400
discover.call_discoverd(FAKE_ARGS, data, failures)
mock_post.assert_called_once_with('url',
data='{"data": 42, "error": null}')
mock_post.return_value.raise_for_status.assert_called_once_with()
class TestCollectLogs(unittest.TestCase):
def test(self):
temp_dir = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(temp_dir))
files = [os.path.join(temp_dir, fname)
for fname in ('main', 'log_1', 'log_2')]
for fname in files[:2]:
with open(fname, 'wb') as fp:
fp.write(fname.encode())
fake_args = get_fake_args()
fake_args.log_file = files[0]
fake_args.system_log_file = files[1:]
res = discover.collect_logs(fake_args)
res = io.BytesIO(base64.b64decode(res))
with tarfile.open(fileobj=res) as tar:
members = list(sorted((m.name, m.size) for m in tar))
self.assertEqual(
list(sorted((name[1:], len(name)) for name in files[:2])),
members)

View File

@ -0,0 +1,162 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import mock
import requests
from ironic_discoverd_ramdisk import discover
from ironic_discoverd_ramdisk import main
from ironic_discoverd_ramdisk.test import test_discover
FAKE_ARGS = test_discover.get_fake_args()
class TestParseArgs(unittest.TestCase):
def test(self):
args = ['-d', 'http://url']
parsed_args = main.parse_args(args)
self.assertEqual('http://url', parsed_args.callback_url)
self.assertTrue(parsed_args.daemonize_on_failure)
def test_log_files(self):
args = ['-L', 'log1', '-L', 'log2', 'url']
parsed_args = main.parse_args(args)
self.assertEqual(['log1', 'log2'],
parsed_args.system_log_file)
@mock.patch.object(main, 'setup_logging', lambda args: None)
@mock.patch.object(main, 'parse_args', return_value=FAKE_ARGS,
autospec=True)
@mock.patch.object(discover, 'fork_and_serve_logs', autospec=True)
@mock.patch.object(discover, 'setup_ipmi_credentials', autospec=True)
@mock.patch.object(discover, 'call_discoverd', autospec=True,
return_value={})
@mock.patch.object(discover, 'collect_logs', autospec=True)
@mock.patch.object(discover, 'discover_hardware', autospec=True)
class TestMain(unittest.TestCase):
def test_success(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_fork_serve, mock_parse):
mock_logs.return_value = 'LOG'
main.main()
# FIXME(dtantsur): mock does not copy arguments, so the 2nd argument
# actually is not what we expect ({})
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
self.assertFalse(mock_fork_serve.called)
self.assertFalse(mock_setup_ipmi.called)
def test_discover_fails(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_fork_serve, mock_parse):
mock_logs.return_value = 'LOG'
mock_discover.side_effect = RuntimeError('boom')
self.assertRaisesRegexp(SystemExit, '1', main.main)
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
failures = mock_callback.call_args[0][2]
self.assertIn('boom', failures.get_error())
mock_fork_serve.assert_called_once_with(FAKE_ARGS)
def test_collect_logs_fails(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_fork_serve, mock_parse):
mock_logs.side_effect = RuntimeError('boom')
main.main()
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {}, mock.ANY)
self.assertFalse(mock_fork_serve.called)
def test_callback_fails(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_fork_serve, mock_parse):
mock_logs.return_value = 'LOG'
mock_callback.side_effect = requests.HTTPError('boom')
self.assertRaisesRegexp(SystemExit, '1', main.main)
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
mock_fork_serve.assert_called_once_with(FAKE_ARGS)
def test_callback_fails2(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_fork_serve, mock_parse):
mock_logs.return_value = 'LOG'
mock_callback.side_effect = RuntimeError('boom')
self.assertRaisesRegexp(SystemExit, '1', main.main)
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
mock_fork_serve.assert_called_once_with(FAKE_ARGS)
def test_no_daemonize(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_fork_serve, mock_parse):
new_fake_args = test_discover.get_fake_args()
new_fake_args.daemonize_on_failure = None
mock_parse.return_value = new_fake_args
mock_logs.return_value = 'LOG'
mock_callback.side_effect = RuntimeError('boom')
self.assertRaisesRegexp(SystemExit, '1', main.main)
mock_discover.assert_called_once_with(new_fake_args, mock.ANY,
mock.ANY)
mock_logs.assert_called_once_with(new_fake_args)
mock_callback.assert_called_once_with(new_fake_args, {'logs': 'LOG'},
mock.ANY)
self.assertFalse(mock_fork_serve.called)
def test_setup_ipmi(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_fork_serve, mock_parse):
mock_logs.return_value = 'LOG'
mock_callback.return_value = {'ipmi_setup_credentials': True}
main.main()
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
mock_setup_ipmi.assert_called_once_with(mock_callback.return_value)
self.assertFalse(mock_fork_serve.called)
def test_setup_ipmi_fails(self, mock_discover, mock_logs, mock_callback,
mock_setup_ipmi, mock_fork_serve, mock_parse):
mock_logs.return_value = 'LOG'
mock_callback.return_value = {'ipmi_setup_credentials': True}
mock_setup_ipmi.side_effect = RuntimeError('boom')
self.assertRaisesRegexp(SystemExit, '1', main.main)
mock_discover.assert_called_once_with(FAKE_ARGS, mock.ANY, mock.ANY)
mock_logs.assert_called_once_with(FAKE_ARGS)
mock_callback.assert_called_once_with(FAKE_ARGS, {'logs': 'LOG'},
mock.ANY)
mock_setup_ipmi.assert_called_once_with(mock_callback.return_value)
mock_fork_serve.assert_called_once_with(FAKE_ARGS)

View File

@ -6,6 +6,7 @@ cliff>=1.10.0,<1.11.0 # Apache-2.0
eventlet>=0.16.1,!=0.17.0
Flask>=0.10,<1.0
keystonemiddleware>=1.5.0
netifaces>=0.10.4
python-ironicclient>=0.2.1
python-keystoneclient>=1.1.0
python-openstackclient>=1.0.0

View File

@ -25,8 +25,11 @@ setup(
author_email = "dtantsur@redhat.com",
url = "https://pypi.python.org/pypi/ironic-discoverd",
packages = ['ironic_discoverd', 'ironic_discoverd.plugins',
'ironic_discoverd.test', 'ironic_discoverd.common'],
'ironic_discoverd.test', 'ironic_discoverd.common',
'ironic_discoverd_ramdisk', 'ironic_discoverd_ramdisk.test'],
install_requires = install_requires,
# because entry points don't work with multiple packages
scripts = ['bin/ironic-discoverd-ramdisk'],
entry_points = {
'console_scripts': [
"ironic-discoverd = ironic_discoverd.main:main",

View File

@ -9,6 +9,7 @@ deps =
-r{toxinidir}/plugin-requirements.txt
commands =
coverage run --branch --include "ironic_discoverd*" -m unittest discover ironic_discoverd.test
coverage run --branch --include "ironic_discoverd_ramdisk*" -a -m unittest discover ironic_discoverd_ramdisk.test
coverage report -m --fail-under 90
setenv = PYTHONDONTWRITEBYTECODE=1
@ -19,7 +20,7 @@ deps =
-r{toxinidir}/test-requirements.txt
-r{toxinidir}/plugin-requirements.txt
commands =
flake8 ironic_discoverd
flake8 ironic_discoverd ironic_discoverd_ramdisk
doc8 README.rst CONTRIBUTING.rst HTTP-API.rst RELEASES.rst
[flake8]