All-in-one Ironic service with a local RPC bus

This adds a new executable /usr/bin/ironic (cool that we no longer have
a CLI with this name) that starts API and conductor together in the same
process. When an RPC host name matches the current one, the call is not
routed through the remote RPC, a local function call is done instead.

Story: #2009676
Task: #43953
Change-Id: I51bf7226aea145dc7c8fd93d61caa233ca16c9c9
This commit is contained in:
Dmitry Tantsur 2021-11-29 11:36:06 +01:00
parent 3f990beb97
commit 9a6f2d101b
10 changed files with 265 additions and 61 deletions

View File

@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_config import cfg
from oslo_log import log
from oslo_service import service
from ironic.cmd import conductor as conductor_cmd
from ironic.common import rpc_service
from ironic.common import service as ironic_service
from ironic.common import wsgi_service
CONF = cfg.CONF
LOG = log.getLogger(__name__)
def main():
# NOTE(lucasagomes): Safeguard to prevent 'ironic.conductor.manager'
# from being imported prior to the configuration options being loaded.
# If this happened, the periodic decorators would always use the
# default values of the options instead of the configured ones. For
# more information see: https://bugs.launchpad.net/ironic/+bug/1562258
# and https://bugs.launchpad.net/ironic/+bug/1279774.
assert 'ironic.conductor.manager' not in sys.modules
# Parse config file and command line options, then start logging
ironic_service.prepare_service('ironic', sys.argv)
launcher = service.ServiceLauncher(CONF, restart_method='mutate')
mgr = rpc_service.RPCService(CONF.host,
'ironic.conductor.manager',
'ConductorManager')
conductor_cmd.issue_startup_warnings(CONF)
launcher.launch_service(mgr)
wsgi = wsgi_service.WSGIService('ironic_api', CONF.api.enable_ssl_api)
launcher.launch_service(wsgi)
launcher.wait()

View File

@ -31,6 +31,9 @@ ALLOWED_EXMODS = [
exception.__name__,
]
EXTRA_EXMODS = []
GLOBAL_MANAGER = None
MANAGER_TOPIC = 'ironic.conductor_manager'
def init(conf):
@ -148,3 +151,10 @@ def get_versioned_notifier(publisher_id=None):
assert VERSIONED_NOTIFIER is not None
assert publisher_id is not None
return VERSIONED_NOTIFIER.prepare(publisher_id=publisher_id)
def set_global_manager(manager):
global GLOBAL_MANAGER
if GLOBAL_MANAGER is not None and manager is not None:
raise RuntimeError("An attempt to set a global manager twice")
GLOBAL_MANAGER = manager

View File

@ -38,7 +38,7 @@ class RPCService(service.Service):
self.host = host
manager_module = importutils.try_import(manager_module)
manager_class = getattr(manager_module, manager_class)
self.manager = manager_class(host, manager_module.MANAGER_TOPIC)
self.manager = manager_class(host, rpc.MANAGER_TOPIC)
self.topic = self.manager.topic
self.rpcserver = None
self.deregister = True
@ -61,6 +61,7 @@ class RPCService(service.Service):
self.handle_signal()
self.manager.init_host(admin_context)
rpc.set_global_manager(self.manager)
LOG.info('Created RPC server for service %(service)s on host '
'%(host)s.',
@ -84,6 +85,7 @@ class RPCService(service.Service):
LOG.info('Stopped RPC server for service %(service)s on host '
'%(host)s.',
{'service': self.topic, 'host': self.host})
rpc.set_global_manager(None)
def _handle_signal(self, signo, frame):
LOG.info('Got signal SIGUSR1. Not deregistering on next shutdown '

View File

@ -77,8 +77,6 @@ from ironic import objects
from ironic.objects import base as objects_base
from ironic.objects import fields
MANAGER_TOPIC = 'ironic.conductor_manager'
LOG = log.getLogger(__name__)
METRICS = metrics_utils.get_metrics_logger(__name__)

View File

@ -21,6 +21,7 @@ Client side of the conductor RPC API.
import random
from ironic_lib.json_rpc import client as json_rpc
from oslo_log import log
import oslo_messaging as messaging
from ironic.common import exception
@ -28,12 +29,54 @@ from ironic.common import hash_ring
from ironic.common.i18n import _
from ironic.common import release_mappings as versions
from ironic.common import rpc
from ironic.conductor import manager
from ironic.conf import CONF
from ironic.db import api as dbapi
from ironic.objects import base as objects_base
LOG = log.getLogger(__name__)
class LocalContext:
"""Context to make calls to a local conductor."""
__slots__ = ()
def call(self, context, rpc_call_name, **kwargs):
"""Make a local conductor call."""
if rpc.GLOBAL_MANAGER is None:
raise exception.ServiceUnavailable(
_("The built-in conductor is not available, it might have "
"crashed. Please check the logs and correct the "
"configuration, if required."))
try:
return getattr(rpc.GLOBAL_MANAGER, rpc_call_name)(context,
**kwargs)
# FIXME(dtantsur): can we somehow avoid wrapping the exception?
except messaging.ExpectedException as exc:
exc_value, exc_tb = exc.exc_info[1:]
raise exc_value.with_traceback(exc_tb) from None
def cast(self, context, rpc_call_name, **kwargs):
"""Make a local conductor call.
It is expected that the underlying call uses a thread to avoid
blocking the caller.
Any exceptions are logged and ignored.
"""
try:
return self.call(context, rpc_call_name, **kwargs)
except Exception:
# In real RPC, casts are completely asynchronous and never return
# actual errors.
LOG.exception('Ignoring unhandled exception from RPC cast %s',
rpc_call_name)
_LOCAL_CONTEXT = LocalContext()
class ConductorAPI(object):
"""Client side of the conductor RPC API.
@ -120,7 +163,7 @@ class ConductorAPI(object):
super(ConductorAPI, self).__init__()
self.topic = topic
if self.topic is None:
self.topic = manager.MANAGER_TOPIC
self.topic = rpc.MANAGER_TOPIC
serializer = objects_base.IronicObjectSerializer()
release_ver = versions.RELEASE_MAPPING.get(CONF.pin_release_version)
@ -139,6 +182,30 @@ class ConductorAPI(object):
# NOTE(tenbrae): this is going to be buggy
self.ring_manager = hash_ring.HashRingManager()
def _prepare_call(self, topic, version=None):
"""Prepare an RPC call.
If a conductor exists in the same process, a direct function call
is used instead of real RPC.
:param topic: RPC topic to send to.
:param version: RPC API version to require.
"""
# FIXME(dtantsur): this doesn't work with either JSON RPC or local
# conductor. Do we even need this fallback?
topic = topic or self.topic
# Normally a topic is a <topic prefix>.<hostname>, we need to extract
# the hostname to match it against the current host.
host = topic[len(self.topic) + 1:]
if rpc.GLOBAL_MANAGER is not None and host == CONF.host:
# Short-cut to a local function call if there is a built-in
# conductor.
return _LOCAL_CONTEXT
# Normal RPC path
return self.client.prepare(topic=topic, version=version)
def get_conductor_for(self, node):
"""Get the conductor which the node is mapped to.
@ -231,7 +298,7 @@ class ConductorAPI(object):
:raises: NoValidDefaultForInterface if no default can be calculated
for some interfaces, and explicit values must be provided.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.36')
cctxt = self._prepare_call(topic=topic, version='1.36')
return cctxt.call(context, 'create_node', node_obj=node_obj)
def update_node(self, context, node_obj, topic=None,
@ -257,7 +324,7 @@ class ConductorAPI(object):
for some interfaces, and explicit values must be provided.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.1')
cctxt = self._prepare_call(topic=topic, version='1.1')
return cctxt.call(context, 'update_node', node_obj=node_obj,
reset_interfaces=reset_interfaces)
@ -278,7 +345,7 @@ class ConductorAPI(object):
async task.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.39')
cctxt = self._prepare_call(topic=topic, version='1.39')
return cctxt.call(context, 'change_node_power_state', node_id=node_id,
new_state=new_state, timeout=timeout)
@ -298,7 +365,7 @@ class ConductorAPI(object):
async task.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.55')
cctxt = self._prepare_call(topic=topic, version='1.55')
return cctxt.call(context, 'change_node_boot_mode', node_id=node_id,
new_state=new_state)
@ -318,7 +385,7 @@ class ConductorAPI(object):
async task.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.55')
cctxt = self._prepare_call(topic=topic, version='1.55')
return cctxt.call(context, 'change_node_secure_boot', node_id=node_id,
new_state=new_state)
@ -355,7 +422,7 @@ class ConductorAPI(object):
or return it in the response body (False).
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.20')
cctxt = self._prepare_call(topic=topic, version='1.20')
return cctxt.call(context, 'vendor_passthru', node_id=node_id,
driver_method=driver_method,
http_method=http_method,
@ -400,7 +467,7 @@ class ConductorAPI(object):
or return it in the response body (False).
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.20')
cctxt = self._prepare_call(topic=topic, version='1.20')
return cctxt.call(context, 'driver_vendor_passthru',
driver_name=driver_name,
driver_method=driver_method,
@ -416,7 +483,7 @@ class ConductorAPI(object):
:returns: dictionary of <method name>:<method metadata> entries.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.21')
cctxt = self._prepare_call(topic=topic, version='1.21')
return cctxt.call(context, 'get_node_vendor_passthru_methods',
node_id=node_id)
@ -438,7 +505,7 @@ class ConductorAPI(object):
:returns: dictionary of <method name>:<method metadata> entries.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.21')
cctxt = self._prepare_call(topic=topic, version='1.21')
return cctxt.call(context, 'get_driver_vendor_passthru_methods',
driver_name=driver_name)
@ -468,7 +535,7 @@ class ConductorAPI(object):
version = '1.52'
new_kws['deploy_steps'] = deploy_steps
cctxt = self.client.prepare(topic=topic or self.topic, version=version)
cctxt = self._prepare_call(topic=topic, version=version)
return cctxt.call(context, 'do_node_deploy', node_id=node_id,
rebuild=rebuild, configdrive=configdrive, **new_kws)
@ -488,7 +555,7 @@ class ConductorAPI(object):
deployed state before this method is called.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.6')
cctxt = self._prepare_call(topic=topic, version='1.6')
return cctxt.call(context, 'do_node_tear_down', node_id=node_id)
def do_provisioning_action(self, context, node_id, action, topic=None):
@ -506,7 +573,7 @@ class ConductorAPI(object):
This encapsulates some provisioning actions in a single call.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.23')
cctxt = self._prepare_call(topic=topic, version='1.23')
return cctxt.call(context, 'do_provisioning_action',
node_id=node_id, action=action)
@ -520,7 +587,7 @@ class ConductorAPI(object):
:param node_id: node id or uuid.
:param topic: RPC topic. Defaults to self.topic.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.27')
cctxt = self._prepare_call(topic=topic, version='1.27')
return cctxt.cast(context, 'continue_node_clean',
node_id=node_id)
@ -534,7 +601,7 @@ class ConductorAPI(object):
:param node_id: node id or uuid.
:param topic: RPC topic. Defaults to self.topic.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.45')
cctxt = self._prepare_call(topic=topic, version='1.45')
return cctxt.cast(context, 'continue_node_deploy',
node_id=node_id)
@ -548,7 +615,7 @@ class ConductorAPI(object):
interface validation.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.5')
cctxt = self._prepare_call(topic=topic, version='1.5')
return cctxt.call(context, 'validate_driver_interfaces',
node_id=node_id)
@ -564,7 +631,7 @@ class ConductorAPI(object):
:raises: InvalidState if the node is in the wrong provision
state to perform deletion.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.9')
cctxt = self._prepare_call(topic=topic, version='1.9')
return cctxt.call(context, 'destroy_node', node_id=node_id)
def get_console_information(self, context, node_id, topic=None):
@ -578,7 +645,7 @@ class ConductorAPI(object):
:raises: InvalidParameterValue when the wrong driver info is specified.
:raises: MissingParameterValue if a required parameter is missing
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.11')
cctxt = self._prepare_call(topic=topic, version='1.11')
return cctxt.call(context, 'get_console_information', node_id=node_id)
def set_console_mode(self, context, node_id, enabled, topic=None):
@ -596,7 +663,7 @@ class ConductorAPI(object):
:raises: NoFreeConductorWorker when there is no free worker to start
async task.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.11')
cctxt = self._prepare_call(topic=topic, version='1.11')
return cctxt.call(context, 'set_console_mode', node_id=node_id,
enabled=enabled)
@ -612,7 +679,7 @@ class ConductorAPI(object):
:param topic: RPC topic. Defaults to self.topic.
:returns: created port object.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.41')
cctxt = self._prepare_call(topic=topic, version='1.41')
return cctxt.call(context, 'create_port', port_obj=port_obj)
def update_port(self, context, port_obj, topic=None):
@ -628,7 +695,7 @@ class ConductorAPI(object):
:returns: updated port object, including all fields.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.13')
cctxt = self._prepare_call(topic=topic, version='1.13')
return cctxt.call(context, 'update_port', port_obj=port_obj)
def update_portgroup(self, context, portgroup_obj, topic=None):
@ -645,7 +712,7 @@ class ConductorAPI(object):
:returns: updated portgroup object, including all fields.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.33')
cctxt = self._prepare_call(topic=topic, version='1.33')
return cctxt.call(context, 'update_portgroup',
portgroup_obj=portgroup_obj)
@ -660,7 +727,7 @@ class ConductorAPI(object):
not exist.
:raises: PortgroupNotEmpty if portgroup is not empty
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.33')
cctxt = self._prepare_call(topic=topic, version='1.33')
return cctxt.call(context, 'destroy_portgroup', portgroup=portgroup)
def get_driver_properties(self, context, driver_name, topic=None):
@ -674,7 +741,7 @@ class ConductorAPI(object):
:raises: DriverNotFound.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.16')
cctxt = self._prepare_call(topic=topic, version='1.16')
return cctxt.call(context, 'get_driver_properties',
driver_name=driver_name)
@ -699,7 +766,7 @@ class ConductorAPI(object):
specified or an invalid boot device is specified.
:raises: MissingParameterValue if missing supplied info.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.17')
cctxt = self._prepare_call(topic=topic, version='1.17')
return cctxt.call(context, 'set_boot_device', node_id=node_id,
device=device, persistent=persistent)
@ -725,7 +792,7 @@ class ConductorAPI(object):
future boots or not, None if it is unknown.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.17')
cctxt = self._prepare_call(topic=topic, version='1.17')
return cctxt.call(context, 'get_boot_device', node_id=node_id)
def inject_nmi(self, context, node_id, topic=None):
@ -745,7 +812,7 @@ class ConductorAPI(object):
:raises: MissingParameterValue if missing supplied info.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.40')
cctxt = self._prepare_call(topic=topic, version='1.40')
return cctxt.call(context, 'inject_nmi', node_id=node_id)
def get_supported_boot_devices(self, context, node_id, topic=None):
@ -766,7 +833,7 @@ class ConductorAPI(object):
in :mod:`ironic.common.boot_devices`.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.17')
cctxt = self._prepare_call(topic=topic, version='1.17')
return cctxt.call(context, 'get_supported_boot_devices',
node_id=node_id)
@ -791,7 +858,7 @@ class ConductorAPI(object):
:raises: MissingParameterValue if missing supplied info.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.50')
cctxt = self._prepare_call(topic=topic, version='1.50')
return cctxt.call(context, 'set_indicator_state', node_id=node_id,
component=component, indicator=indicator,
state=state)
@ -817,7 +884,7 @@ class ConductorAPI(object):
mod:`ironic.common.indicator_states`.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.50')
cctxt = self._prepare_call(topic=topic, version='1.50')
return cctxt.call(context, 'get_indicator_state', node_id=node_id,
component=component, indicator=indicator)
@ -849,7 +916,7 @@ class ConductorAPI(object):
}
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.50')
cctxt = self._prepare_call(topic=topic, version='1.50')
return cctxt.call(context, 'get_supported_indicators', node_id=node_id,
component=component)
@ -869,7 +936,7 @@ class ConductorAPI(object):
action to do in the current state.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.24')
cctxt = self._prepare_call(topic=topic, version='1.24')
return cctxt.call(context, 'inspect_hardware', node_id=node_id)
def destroy_port(self, context, port, topic=None):
@ -882,7 +949,7 @@ class ConductorAPI(object):
:raises: NodeNotFound if the node associated with the port does not
exist.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.25')
cctxt = self._prepare_call(topic=topic, version='1.25')
return cctxt.call(context, 'destroy_port', port=port)
def set_target_raid_config(self, context, node_id, target_raid_config,
@ -904,7 +971,7 @@ class ConductorAPI(object):
missing.
:raises: NodeLocked if node is locked by another conductor.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.30')
cctxt = self._prepare_call(topic=topic, version='1.30')
return cctxt.call(context, 'set_target_raid_config',
node_id=node_id,
target_raid_config=target_raid_config)
@ -929,7 +996,7 @@ class ConductorAPI(object):
:returns: A dictionary containing the properties that can be mentioned
for logical disks and a textual description for them.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.30')
cctxt = self._prepare_call(topic=topic, version='1.30')
return cctxt.call(context, 'get_raid_logical_disk_properties',
driver_name=driver_name)
@ -957,7 +1024,7 @@ class ConductorAPI(object):
params['disable_ramdisk'] = disable_ramdisk
version = '1.53'
cctxt = self.client.prepare(topic=topic or self.topic, version=version)
cctxt = self._prepare_call(topic=topic, version=version)
return cctxt.call(context, 'do_node_clean',
node_id=node_id, clean_steps=clean_steps, **params)
@ -993,7 +1060,7 @@ class ConductorAPI(object):
version = '1.54'
new_kws['agent_status'] = agent_status
new_kws['agent_status_message'] = agent_status_message
cctxt = self.client.prepare(topic=topic or self.topic, version=version)
cctxt = self._prepare_call(topic=topic, version=version)
return cctxt.call(context, 'heartbeat', node_id=node_id,
callback_url=callback_url, **new_kws)
@ -1019,7 +1086,7 @@ class ConductorAPI(object):
raise NotImplementedError(_('Incompatible conductor version - '
'please upgrade ironic-conductor '
'first'))
cctxt = self.client.prepare(topic=self.topic, version='1.31')
cctxt = self._prepare_call(topic=self.topic, version='1.31')
return cctxt.call(context, 'object_class_action_versions',
objname=objname, objmethod=objmethod,
object_versions=object_versions,
@ -1045,7 +1112,7 @@ class ConductorAPI(object):
raise NotImplementedError(_('Incompatible conductor version - '
'please upgrade ironic-conductor '
'first'))
cctxt = self.client.prepare(topic=self.topic, version='1.31')
cctxt = self._prepare_call(topic=self.topic, version='1.31')
return cctxt.call(context, 'object_action', objinst=objinst,
objmethod=objmethod, args=args, kwargs=kwargs)
@ -1070,7 +1137,7 @@ class ConductorAPI(object):
raise NotImplementedError(_('Incompatible conductor version - '
'please upgrade ironic-conductor '
'first'))
cctxt = self.client.prepare(topic=self.topic, version='1.31')
cctxt = self._prepare_call(topic=self.topic, version='1.31')
return cctxt.call(context, 'object_backport_versions', objinst=objinst,
object_versions=object_versions)
@ -1089,7 +1156,7 @@ class ConductorAPI(object):
:raises: VolumeConnectorNotFound if the volume connector cannot be
found
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.35')
cctxt = self._prepare_call(topic=topic, version='1.35')
return cctxt.call(context, 'destroy_volume_connector',
connector=connector)
@ -1116,7 +1183,7 @@ class ConductorAPI(object):
:returns: updated volume connector object, including all fields.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.35')
cctxt = self._prepare_call(topic=topic, version='1.35')
return cctxt.call(context, 'update_volume_connector',
connector=connector)
@ -1131,7 +1198,7 @@ class ConductorAPI(object):
not exist
:raises: VolumeTargetNotFound if the volume target cannot be found
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.37')
cctxt = self._prepare_call(topic=topic, version='1.37')
return cctxt.call(context, 'destroy_volume_target',
target=target)
@ -1156,7 +1223,7 @@ class ConductorAPI(object):
:returns: updated volume target object, including all fields
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.37')
cctxt = self._prepare_call(topic=topic, version='1.37')
return cctxt.call(context, 'update_volume_target',
target=target)
@ -1174,7 +1241,7 @@ class ConductorAPI(object):
:raises: InvalidParameterValue, if a parameter that's required for
VIF attach is wrong/missing.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.38')
cctxt = self._prepare_call(topic=topic, version='1.38')
return cctxt.call(context, 'vif_attach', node_id=node_id,
vif_info=vif_info)
@ -1190,7 +1257,7 @@ class ConductorAPI(object):
:raises: InvalidParameterValue, if a parameter that's required for
VIF detach is wrong/missing.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.38')
cctxt = self._prepare_call(topic=topic, version='1.38')
return cctxt.call(context, 'vif_detach', node_id=node_id,
vif_id=vif_id)
@ -1206,7 +1273,7 @@ class ConductorAPI(object):
:raises: InvalidParameterValue, if a parameter that's required for
VIF list is wrong/missing.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.38')
cctxt = self._prepare_call(topic=topic, version='1.38')
return cctxt.call(context, 'vif_list', node_id=node_id)
def do_node_rescue(self, context, node_id, rescue_password, topic=None):
@ -1225,7 +1292,7 @@ class ConductorAPI(object):
state before this method is called.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.43')
cctxt = self._prepare_call(topic=topic, version='1.43')
return cctxt.call(context, 'do_node_rescue', node_id=node_id,
rescue_password=rescue_password)
@ -1243,7 +1310,7 @@ class ConductorAPI(object):
state before this method is called.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.43')
cctxt = self._prepare_call(topic=topic, version='1.43')
return cctxt.call(context, 'do_node_unrescue', node_id=node_id)
def add_node_traits(self, context, node_id, traits, replace=False,
@ -1260,7 +1327,7 @@ class ConductorAPI(object):
:raises: NodeLocked if node is locked by another conductor.
:raises: NodeNotFound if the node does not exist.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.44')
cctxt = self._prepare_call(topic=topic, version='1.44')
return cctxt.call(context, 'add_node_traits', node_id=node_id,
traits=traits, replace=replace)
@ -1276,7 +1343,7 @@ class ConductorAPI(object):
:raises: NodeNotFound if the node does not exist.
:raises: NodeTraitNotFound if one of the traits is not found.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.44')
cctxt = self._prepare_call(topic=topic, version='1.44')
return cctxt.call(context, 'remove_node_traits', node_id=node_id,
traits=traits)
@ -1287,7 +1354,7 @@ class ConductorAPI(object):
:param allocation: an allocation object.
:param topic: RPC topic. Defaults to self.topic.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.48')
cctxt = self._prepare_call(topic=topic, version='1.48')
return cctxt.call(context, 'create_allocation', allocation=allocation)
def destroy_allocation(self, context, allocation, topic=None):
@ -1299,7 +1366,7 @@ class ConductorAPI(object):
:raises: InvalidState if the associated node is in the wrong provision
state to perform deallocation.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.48')
cctxt = self._prepare_call(topic=topic, version='1.48')
return cctxt.call(context, 'destroy_allocation', allocation=allocation)
def get_node_with_token(self, context, node_id, topic=None):
@ -1312,5 +1379,5 @@ class ConductorAPI(object):
:returns: A Node object with agent token.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.49')
cctxt = self._prepare_call(topic=topic, version='1.49')
return cctxt.call(context, 'get_node_with_token', node_id=node_id)

View File

@ -43,6 +43,7 @@ from ironic.common import config as ironic_config
from ironic.common import context as ironic_context
from ironic.common import driver_factory
from ironic.common import hash_ring
from ironic.common import rpc
from ironic.common import utils as common_utils
from ironic.conf import CONF
from ironic.drivers import base as drivers_base
@ -117,6 +118,8 @@ class TestCase(oslo_test_base.BaseTestCase):
for factory in driver_factory._INTERFACE_LOADERS.values():
factory._extension_manager = None
rpc.set_global_manager(None)
# Ban running external processes via 'execute' like functions. If the
# patched function is called, an exception is raised to warn the
# tester.

View File

@ -54,3 +54,4 @@ class TestRPCService(base.TestCase):
mock_prepare_method.assert_called_once_with(self.rpc_svc.manager)
mock_init_method.assert_called_once_with(self.rpc_svc.manager,
mock_ctx.return_value)
self.assertIs(rpc.GLOBAL_MANAGER, self.rpc_svc.manager)

View File

@ -21,6 +21,7 @@ Unit Tests for :py:class:`ironic.conductor.rpcapi.ConductorAPI`.
import copy
from unittest import mock
from ironic_lib.json_rpc import client as json_rpc
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import _utils as messaging_utils
@ -31,6 +32,7 @@ from ironic.common import components
from ironic.common import exception
from ironic.common import indicator_states
from ironic.common import release_mappings
from ironic.common import rpc
from ironic.common import states
from ironic.conductor import manager as conductor_manager
from ironic.conductor import rpcapi as conductor_rpcapi
@ -242,9 +244,7 @@ class RPCAPITestCase(db_base.DbTestCase):
expected_retval = 'hello world' if rpc_method == 'call' else None
expected_topic = 'fake-topic'
if 'host' in kwargs:
expected_topic += ".%s" % kwargs['host']
expected_topic = kwargs.get('topic', 'fake-topic')
target = {
"topic": expected_topic,
@ -715,3 +715,67 @@ class RPCAPITestCase(db_base.DbTestCase):
'call',
allocation='fake-allocation',
version='1.48')
@mock.patch.object(rpc, 'GLOBAL_MANAGER',
spec_set=conductor_manager.ConductorManager)
def test_local_call(self, mock_manager):
CONF.set_override('host', 'fake.host')
rpcapi = conductor_rpcapi.ConductorAPI(topic='fake.topic')
rpcapi.create_node(mock.sentinel.context, mock.sentinel.node,
topic='fake.topic.fake.host')
mock_manager.create_node.assert_called_once_with(
mock.sentinel.context, node_obj=mock.sentinel.node)
@mock.patch.object(rpc, 'GLOBAL_MANAGER',
spec_set=conductor_manager.ConductorManager)
def test_local_call_host_mismatch(self, mock_manager):
CONF.set_override('host', 'fake.host')
rpcapi = conductor_rpcapi.ConductorAPI(topic='fake.topic')
rpcapi.client = mock.Mock(spec_set=json_rpc.Client)
rpcapi.create_node(mock.sentinel.context, mock.sentinel.node,
topic='fake.topic.not-fake.host')
mock_manager.create_node.assert_not_called()
rpcapi.client.prepare.assert_called_once_with(
topic='fake.topic.not-fake.host', version=mock.ANY)
@mock.patch.object(rpc, 'GLOBAL_MANAGER',
spec_set=conductor_manager.ConductorManager)
def test_local_cast(self, mock_manager):
CONF.set_override('host', 'fake.host')
rpcapi = conductor_rpcapi.ConductorAPI(topic='fake.topic')
cctxt = rpcapi._prepare_call(topic='fake.topic.fake.host')
cctxt.cast(mock.sentinel.context, 'create_node',
node_obj=mock.sentinel.node)
mock_manager.create_node.assert_called_once_with(
mock.sentinel.context, node_obj=mock.sentinel.node)
@mock.patch.object(conductor_rpcapi.LOG, 'exception', autospec=True)
@mock.patch.object(rpc, 'GLOBAL_MANAGER',
spec_set=conductor_manager.ConductorManager)
def test_local_cast_error(self, mock_manager, mock_log):
CONF.set_override('host', 'fake.host')
mock_manager.create_node.side_effect = RuntimeError('boom')
rpcapi = conductor_rpcapi.ConductorAPI(topic='fake.topic')
cctxt = rpcapi._prepare_call(topic='fake.topic.fake.host')
cctxt.cast(mock.sentinel.context, 'create_node',
node_obj=mock.sentinel.node)
mock_manager.create_node.assert_called_once_with(
mock.sentinel.context, node_obj=mock.sentinel.node)
self.assertTrue(mock_log.called)
@mock.patch.object(rpc, 'GLOBAL_MANAGER',
spec_set=conductor_manager.ConductorManager)
def test_local_call_expected_exception(self, mock_manager):
@messaging.expected_exceptions(exception.InvalidParameterValue)
def fake_create(context, node_obj):
raise exception.InvalidParameterValue('sorry')
CONF.set_override('host', 'fake.host')
rpcapi = conductor_rpcapi.ConductorAPI(topic='fake.topic')
mock_manager.create_node.side_effect = fake_create
self.assertRaisesRegex(exception.InvalidParameterValue, 'sorry',
rpcapi.create_node,
mock.sentinel.context, mock.sentinel.node,
topic='fake.topic.fake.host')
mock_manager.create_node.assert_called_once_with(
mock.sentinel.context, node_obj=mock.sentinel.node)

View File

@ -0,0 +1,6 @@
---
features:
- |
Adds a new executable ``ironic`` that starts both API and conductor in the
same process. Calls between the API and conductor instances in the same
process are not routed through the RPC.

View File

@ -41,6 +41,7 @@ oslo.policy.policies =
ironic.api = ironic.common.policy:list_policies
console_scripts =
ironic = ironic.cmd.singleprocess:main
ironic-api = ironic.cmd.api:main
ironic-dbsync = ironic.cmd.dbsync:main
ironic-conductor = ironic.cmd.conductor:main