Support hooks for processing data

Change-Id: I5ae1623664549663570f036fa4ae3661b19f1b89
Implements: blueprint plugin-architecture
This commit is contained in:
Dmitry Tantsur 2014-11-27 15:09:48 +01:00
parent 7894c3d3e7
commit dafa4d0013
10 changed files with 220 additions and 24 deletions

@ -192,11 +192,15 @@ Change Log
v1.0.0 v1.0.0
~~~~~~ ~~~~~~
* Add support for plugins that hook into data processing pipeline, see
`plugin-architecture blueprint`_ for details.
* Cache nodes under discovery in a local SQLite database. Set ``database`` * Cache nodes under discovery in a local SQLite database. Set ``database``
configuration option to persist this database. Improves performance by configuration option to persist this database. Improves performance by
making less calls to Ironic API. making less calls to Ironic API.
* Create ``CONTRIBUTING.rst``. * Create ``CONTRIBUTING.rst``.
.. _plugin-architecture blueprint: https://blueprints.launchpad.net/ironic-discoverd/+spec/plugin-architecture
v0.2.4 v0.2.4
~~~~~~ ~~~~~~

@ -36,5 +36,8 @@
; file. Do not use :memory: here, it won't work. ; file. Do not use :memory: here, it won't work.
;database = ;database =
; Comma-separated list of enabled hooks for processing pipeline.
;processing_hooks =
; Debug mode enabled/disabled. ; Debug mode enabled/disabled.
;debug = false ;debug = false

@ -25,6 +25,7 @@ DEFAULTS = {
'ironic_retry_attempts': '5', 'ironic_retry_attempts': '5',
'ironic_retry_period': '5', 'ironic_retry_period': '5',
'database': '', 'database': '',
'processing_hooks': '',
} }

@ -20,6 +20,7 @@ from ironicclient import exceptions
from ironic_discoverd import conf from ironic_discoverd import conf
from ironic_discoverd import firewall from ironic_discoverd import firewall
from ironic_discoverd import node_cache from ironic_discoverd import node_cache
from ironic_discoverd.plugins import base as plugins_base
from ironic_discoverd import utils from ironic_discoverd import utils
@ -28,6 +29,10 @@ LOG = logging.getLogger("discoverd")
def process(node_info): def process(node_info):
"""Process data from discovery ramdisk.""" """Process data from discovery ramdisk."""
hooks = plugins_base.processing_hooks_manager()
for hook_ext in hooks:
hook_ext.obj.pre_discover(node_info)
if node_info.get('error'): if node_info.get('error'):
LOG.error('Error happened during discovery: %s', LOG.error('Error happened during discovery: %s',
node_info['error']) node_info['error'])
@ -95,24 +100,46 @@ def process(node_info):
def _process_node(ironic, node, node_info, valid_macs): def _process_node(ironic, node, node_info, valid_macs):
patch = [{'op': 'add', 'path': '/extra/newly_discovered', 'value': 'true'}, hooks = plugins_base.processing_hooks_manager()
{'op': 'remove', 'path': '/extra/on_discovery'}]
existing = node.properties
for key in ('cpus', 'cpu_arch', 'memory_mb', 'local_gb'):
if not existing.get(key):
patch.append({'op': 'add', 'path': '/properties/%s' % key,
'value': str(node_info[key])})
ironic.node.update(node.uuid, patch)
ports = {}
for mac in valid_macs: for mac in valid_macs:
try: try:
ironic.port.create(node_uuid=node.uuid, address=mac) port = ironic.port.create(node_uuid=node.uuid, address=mac)
ports[mac] = port
except exceptions.Conflict: except exceptions.Conflict:
LOG.warning('MAC %(mac)s appeared in discovery data for ' LOG.warning('MAC %(mac)s appeared in discovery data for '
'node %(node)s, but already exists in ' 'node %(node)s, but already exists in '
'database - skipping', 'database - skipping',
{'mac': mac, 'node': node.uuid}) {'mac': mac, 'node': node.uuid})
patch = [{'op': 'add', 'path': '/extra/newly_discovered', 'value': 'true'},
{'op': 'remove', 'path': '/extra/on_discovery'}]
node_patches = []
port_patches = {}
for hook_ext in hooks:
hook_patch = hook_ext.obj.post_discover(node, list(ports.values()),
node_info)
if not hook_patch:
continue
node_patches.extend(hook_patch[0])
port_patches.update(hook_patch[1])
node_patches = [p for p in node_patches if p]
port_patches = {mac: patch for (mac, patch) in port_patches.items()
if mac in ports and patch}
existing = node.properties
for key in ('cpus', 'cpu_arch', 'memory_mb', 'local_gb'):
if not existing.get(key):
patch.append({'op': 'add', 'path': '/properties/%s' % key,
'value': str(node_info[key])})
ironic.node.update(node.uuid, patch + node_patches)
for mac, patches in port_patches.items():
ironic.port.update(ports[mac].uuid, patches)
LOG.info('Node %s was updated with data from discovery process, forcing ' LOG.info('Node %s was updated with data from discovery process, forcing '
'power off', node.uuid) 'power off', node.uuid)

@ -0,0 +1,83 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base code for plugins support."""
import abc
import six
from stevedore import named
from ironic_discoverd import conf
@six.add_metaclass(abc.ABCMeta)
class ProcessingHook(object): # pragma: no cover
"""Abstract base class for discovery data processing hooks."""
def pre_discover(self, node_info):
"""Pre-discovery hook.
This hook is run before any processing is done on data, even sanity
checks.
:param node_info: raw information sent by the ramdisk, may be modified
by the hook.
:returns: nothing.
"""
def post_discover(self, node, ports, discovered_data):
"""Post-discovery hook.
This hook is run after node is found, just before it's updated with the
data.
:param node: Ironic node as returned by the Ironic client, should not
be modified directly by the hook.
:param ports: Ironic ports created by discoverd, also should not be
updated directly.
:param discovered_data: processed data from the ramdisk.
:returns: tuple (node patches, port patches) where
*node_patches* is a list of JSON patches [RFC 6902] to apply
to the node, *port_patches* is a dict where keys are
port MAC's, values are lists of JSON patches, e.g.
::
(
[{'op': 'add', 'path': '/extra/foo', 'value': 'bar'}],
{'11:22:33:44:55:55': [
{'op': 'add', 'path': '/extra/foo', 'value': 'bar'}
]}
)
[RFC 6902] - http://tools.ietf.org/html/rfc6902
"""
_HOOKS_MGR = None
def processing_hooks_manager(*args):
"""Create a Stevedore extension manager for processing hooks.
:param args: arguments to pass to the hooks constructor.
"""
global _HOOKS_MGR
if _HOOKS_MGR is None:
names = [x.strip()
for x in conf.get('discoverd', 'processing_hooks').split(',')
if x.strip()]
_HOOKS_MGR = named.NamedExtensionManager('ironic_discoverd.hooks',
names=names,
invoke_on_load=True,
invoke_args=args,
name_order=True)
return _HOOKS_MGR

@ -0,0 +1,29 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example plugin."""
import logging
from ironic_discoverd.plugins import base
LOG = logging.getLogger('ironic_discoverd.plugins.example')
class ExampleProcessingHook(base.ProcessingHook): # pragma: no cover
def pre_discover(self, node_info):
LOG.info('pre-discover: %s', node_info)
def post_discover(self, node, ports, discovered_data):
LOG.info('post-discover: %s (node %s)', discovered_data, node.uuid)

@ -28,6 +28,8 @@ from ironic_discoverd import discoverd
from ironic_discoverd import firewall from ironic_discoverd import firewall
from ironic_discoverd import main from ironic_discoverd import main
from ironic_discoverd import node_cache from ironic_discoverd import node_cache
from ironic_discoverd.plugins import base as plugins_base
from ironic_discoverd.plugins import example as example_plugin
from ironic_discoverd import utils from ironic_discoverd import utils
@ -42,7 +44,8 @@ class BaseTest(unittest.TestCase):
self.addCleanup(lambda: os.unlink(node_cache._DB_NAME)) self.addCleanup(lambda: os.unlink(node_cache._DB_NAME))
# FIXME(dtantsur): this test suite is far from being complete @patch.object(example_plugin.ExampleProcessingHook, 'post_discover')
@patch.object(example_plugin.ExampleProcessingHook, 'pre_discover')
@patch.object(firewall, 'update_filters', autospec=True) @patch.object(firewall, 'update_filters', autospec=True)
@patch.object(node_cache, 'pop_node', autospec=True) @patch.object(node_cache, 'pop_node', autospec=True)
@patch.object(utils, 'get_client', autospec=True) @patch.object(utils, 'get_client', autospec=True)
@ -73,12 +76,26 @@ class TestProcess(BaseTest):
} }
} }
self.macs = ['11:22:33:44:55:66', 'broken', '', '66:55:44:33:22:11'] self.macs = ['11:22:33:44:55:66', 'broken', '', '66:55:44:33:22:11']
self.port = Mock(uuid='port_uuid')
def _do_test(self, client_mock, pop_mock, filters_mock, pre_mock,
post_mock):
plugins_base._HOOKS_MGR = None
conf.CONF.set('discoverd', 'processing_hooks', 'example')
def _do_test(self, client_mock, pop_mock, filters_mock):
cli = client_mock.return_value cli = client_mock.return_value
cli.port.create.side_effect = [None, exceptions.Conflict()]
def fake_port_create(node_uuid, address):
if address == '11:22:33:44:55:66':
return self.port
else:
raise exceptions.Conflict()
cli.port.create.side_effect = fake_port_create
pop_mock.return_value = self.node.uuid pop_mock.return_value = self.node.uuid
cli.node.get.return_value = self.node cli.node.get.return_value = self.node
post_mock.return_value = (['fake patch', 'fake patch 2'],
{'11:22:33:44:55:66': ['port patch']})
discoverd.process(self.data) discoverd.process(self.data)
@ -88,7 +105,9 @@ class TestProcess(BaseTest):
self.assertEqual(['11:22:33:44:55:66', '66:55:44:33:22:11'], self.assertEqual(['11:22:33:44:55:66', '66:55:44:33:22:11'],
sorted(pop_mock.call_args[1]['mac'])) sorted(pop_mock.call_args[1]['mac']))
cli.node.update.assert_called_once_with(self.node.uuid, self.patch) cli.node.update.assert_called_once_with(self.node.uuid,
self.patch + ['fake patch',
'fake patch 2'])
cli.port.create.assert_any_call(node_uuid=self.node.uuid, cli.port.create.assert_any_call(node_uuid=self.node.uuid,
address='11:22:33:44:55:66') address='11:22:33:44:55:66')
cli.port.create.assert_any_call(node_uuid=self.node.uuid, cli.port.create.assert_any_call(node_uuid=self.node.uuid,
@ -96,22 +115,30 @@ class TestProcess(BaseTest):
self.assertEqual(2, cli.port.create.call_count) self.assertEqual(2, cli.port.create.call_count)
filters_mock.assert_called_once_with(cli) filters_mock.assert_called_once_with(cli)
cli.node.set_power_state.assert_called_once_with(self.node.uuid, 'off') cli.node.set_power_state.assert_called_once_with(self.node.uuid, 'off')
cli.port.update.assert_called_once_with(self.port.uuid, ['port patch'])
def test_ok(self, client_mock, pop_mock, filters_mock): pre_mock.assert_called_once_with(self.data)
self._do_test(client_mock, pop_mock, filters_mock) post_mock.assert_called_once_with(self.node, [self.port], self.data)
def test_deprecated_macs(self, client_mock, pop_mock, filters_mock): def test_ok(self, client_mock, pop_mock, filters_mock, pre_mock,
post_mock):
self._do_test(client_mock, pop_mock, filters_mock, pre_mock, post_mock)
def test_deprecated_macs(self, client_mock, pop_mock, filters_mock,
pre_mock, post_mock):
del self.data['interfaces'] del self.data['interfaces']
self.data['macs'] = self.macs self.data['macs'] = self.macs
self._do_test(client_mock, pop_mock, filters_mock) self._do_test(client_mock, pop_mock, filters_mock, pre_mock, post_mock)
def test_ports_for_inactive(self, client_mock, pop_mock, filters_mock): def test_ports_for_inactive(self, client_mock, pop_mock, filters_mock,
pre_mock, post_mock):
del self.data['interfaces']['em4'] del self.data['interfaces']['em4']
conf.CONF.set('discoverd', 'ports_for_inactive_interfaces', conf.CONF.set('discoverd', 'ports_for_inactive_interfaces',
'true') 'true')
self._do_test(client_mock, pop_mock, filters_mock) self._do_test(client_mock, pop_mock, filters_mock, pre_mock, post_mock)
def test_not_found(self, client_mock, pop_mock, filters_mock): def test_not_found(self, client_mock, pop_mock, filters_mock, pre_mock,
post_mock):
cli = client_mock.return_value cli = client_mock.return_value
pop_mock.return_value = None pop_mock.return_value = None
@ -121,7 +148,8 @@ class TestProcess(BaseTest):
self.assertFalse(cli.port.create.called) self.assertFalse(cli.port.create.called)
self.assertFalse(cli.node.set_power_state.called) self.assertFalse(cli.node.set_power_state.called)
def test_not_found_in_ironic(self, client_mock, pop_mock, filters_mock): def test_not_found_in_ironic(self, client_mock, pop_mock, filters_mock,
pre_mock, post_mock):
cli = client_mock.return_value cli = client_mock.return_value
pop_mock.return_value = self.node.uuid pop_mock.return_value = self.node.uuid
cli.node.get.side_effect = exceptions.NotFound() cli.node.get.side_effect = exceptions.NotFound()
@ -536,5 +564,20 @@ class TestNodeCachePop(BaseTest):
"select * from attributes").fetchall()) "select * from attributes").fetchall())
class TestPlugins(unittest.TestCase):
@patch.object(example_plugin.ExampleProcessingHook, 'pre_discover',
autospec=True)
@patch.object(example_plugin.ExampleProcessingHook, 'post_discover',
autospec=True)
def test_hook(self, mock_post, mock_pre):
plugins_base._HOOKS_MGR = None
conf.CONF.set('discoverd', 'processing_hooks', 'example')
mgr = plugins_base.processing_hooks_manager()
mgr.map_method('pre_discover', 'node_info')
mock_pre.assert_called_once_with(ANY, 'node_info')
mgr.map_method('post_discover', 'node', ['port'], 'node_info')
mock_post.assert_called_once_with(ANY, 'node', ['port'], 'node_info')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

@ -4,3 +4,4 @@ python-ironicclient>=0.2.1
python-keystoneclient>=0.10.0 python-keystoneclient>=0.10.0
requests>=2.2.0,!=2.4.0 requests>=2.2.0,!=2.4.0
six>=1.7.0 six>=1.7.0
stevedore>=1.1.0

@ -17,9 +17,14 @@ setup(
url = "https://pypi.python.org/pypi/ironic-discoverd", url = "https://pypi.python.org/pypi/ironic-discoverd",
packages = ['ironic_discoverd'], packages = ['ironic_discoverd'],
install_requires = install_requires, install_requires = install_requires,
entry_points = {'console_scripts': [ entry_points = {
"ironic-discoverd = ironic_discoverd.main:main" 'console_scripts': [
]}, "ironic-discoverd = ironic_discoverd.main:main"
],
'ironic_discoverd.hooks': [
"example = ironic_discoverd.plugins.example:ExampleProcessingHook",
],
},
classifiers = [ classifiers = [
'Development Status :: 5 - Production/Stable', 'Development Status :: 5 - Production/Stable',
'Environment :: OpenStack', 'Environment :: OpenStack',