Add ironic-python-agent deploy driver

This commit adds a deploy driver for the ironic-python-agent
project. Caveats:

* Only basic functionality (deploy and tear_down) is implemented.
* Only whole disk images are supported.

Implements: blueprint agent-driver

Change-Id: Iebf8d8f756770549d6fcd1bb2fe94d2585d576b1
This commit is contained in:
Jim Rollenhagen 2014-06-18 13:49:59 -07:00
parent 980e7689d8
commit 3e568fbbbc
13 changed files with 1342 additions and 2 deletions

View File

@ -451,6 +451,36 @@
#run_external_periodic_tasks=true
[agent]
#
# Options defined in ironic.drivers.modules.agent
#
# Additional append parameters for baremetal PXE boot. (string
# value)
#agent_pxe_append_params=nofb nomodeset vga=normal
# Template file for PXE configuration. (string value)
#agent_pxe_config_template=$pybasedir/drivers/modules/agent_config.template
# Neutron bootfile DHCP parameter. (string value)
#agent_pxe_bootfile_name=pxelinux.0
# Maximum interval (in seconds) for agent heartbeats. (integer
# value)
#heartbeat_timeout=300
#
# Options defined in ironic.drivers.modules.agent_client
#
# API version to use for communicating with the ramdisk agent.
# (string value)
#agent_api_version=v1
[api]
#

View File

@ -29,7 +29,12 @@ app = {
'static_root': '%(confdir)s/public',
'debug': False,
'enable_acl': True,
'acl_public_routes': ['/', '/v1'],
'acl_public_routes': [
'/',
'/v1',
'/v1/drivers/agent_[a-z]*/vendor_passthru/lookup',
'/v1/nodes/[a-z0-9\-]+/vendor_passthru/heartbeat'
],
}
# WSME Configurations

87
ironic/drivers/agent.py Normal file
View File

@ -0,0 +1,87 @@
# Copyright 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ironic.drivers import base
from ironic.drivers.modules import agent
from ironic.drivers.modules import ipminative
from ironic.drivers.modules import ipmitool
from ironic.drivers.modules import ssh
from ironic.drivers import utils
class AgentAndIPMIToolDriver(base.BaseDriver):
"""Agent + IPMITool driver.
This driver implements the `core` functionality, combining
:class:`ironic.drivers.modules.ipmitool.IPMIPower` (for power on/off and
reboot) with :class:`ironic.drivers.modules.agent.AgentDeploy` (for
image deployment).
Implementations are in those respective classes; this class is merely the
glue between them.
"""
def __init__(self):
self.power = ipmitool.IPMIPower()
self.deploy = agent.AgentDeploy()
self.agent_vendor = agent.AgentVendorInterface()
self.mapping = {'heartbeat': self.agent_vendor}
self.dl_mapping = {'lookup': self.agent_vendor}
self.vendor = utils.MixinVendorInterface(self.mapping,
driver_passthru_mapping=self.dl_mapping)
class AgentAndIPMINativeDriver(base.BaseDriver):
"""Agent + IPMINative driver.
This driver implements the `core` functionality, combining
:class:`ironic.drivers.modules.ipminative.NativeIPMIPower` (for power
on/off and reboot) with
:class:`ironic.drivers.modules.agent.AgentDeploy` (for image
deployment).
Implementations are in those respective classes; this class is merely the
glue between them.
"""
def __init__(self):
self.power = ipminative.NativeIPMIPower()
self.deploy = agent.AgentDeploy()
self.agent_vendor = agent.AgentVendorInterface()
self.mapping = {'heartbeat': self.agent_vendor}
self.dl_mapping = {'lookup': self.agent_vendor}
self.vendor = utils.MixinVendorInterface(self.mapping,
driver_passthru_mapping=self.dl_mapping)
class AgentAndSSHDriver(base.BaseDriver):
"""Agent + SSH driver.
NOTE: This driver is meant only for testing environments.
This driver implements the `core` functionality, combining
:class:`ironic.drivers.modules.ssh.SSH` (for power on/off and reboot of
virtual machines tunneled over SSH), with
:class:`ironic.drivers.modules.agent.AgentDeploy` (for image
deployment). Implementations are in those respective classes; this class
is merely the glue between them.
"""
def __init__(self):
self.power = ssh.SSHPower()
self.deploy = agent.AgentDeploy()
self.management = ssh.SSHManagement()
self.agent_vendor = agent.AgentVendorInterface()
self.mapping = {'heartbeat': self.agent_vendor}
self.dl_mapping = {'lookup': self.agent_vendor}
self.vendor = utils.MixinVendorInterface(self.mapping,
driver_passthru_mapping=self.dl_mapping)

View File

@ -19,6 +19,7 @@ Fake drivers used in testing.
from ironic.common import exception
from ironic.drivers import base
from ironic.drivers.modules import agent
from ironic.drivers.modules import fake
from ironic.drivers.modules import ipminative
from ironic.drivers.modules import ipmitool
@ -95,3 +96,12 @@ class FakeSeaMicroDriver(base.BaseDriver):
self.deploy = fake.FakeDeploy()
self.management = seamicro.Management()
self.vendor = seamicro.VendorPassthru()
class FakeAgentDriver(base.BaseDriver):
"""Example implementation of an AgentDriver."""
def __init__(self):
self.power = fake.FakePower()
self.deploy = agent.AgentDeploy()
self.vendor = agent.AgentVendorInterface()

View File

@ -0,0 +1,635 @@
# Copyright 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from oslo.config import cfg
from ironic.common import exception
from ironic.common import i18n
from ironic.common import image_service
from ironic.common import images
from ironic.common import keystone
from ironic.common import neutron
from ironic.common import paths
from ironic.common import pxe_utils
from ironic.common import states
from ironic.common import utils
from ironic.conductor import task_manager
from ironic.conductor import utils as manager_utils
from ironic.db import api as dbapi
from ironic.drivers import base
from ironic.drivers.modules import agent_client
from ironic.drivers.modules import image_cache
from ironic.objects import node as node_module
from ironic.openstack.common import excutils
from ironic.openstack.common import fileutils
from ironic.openstack.common import log
_LW = i18n._LW
agent_opts = [
cfg.StrOpt('agent_pxe_append_params',
default='nofb nomodeset vga=normal',
help='Additional append parameters for baremetal PXE boot.'),
cfg.StrOpt('agent_pxe_config_template',
default=paths.basedir_def(
'drivers/modules/agent_config.template'),
help='Template file for PXE configuration.'),
cfg.StrOpt('agent_pxe_bootfile_name',
default='pxelinux.0',
help='Neutron bootfile DHCP parameter.'),
cfg.IntOpt('heartbeat_timeout',
default=300,
help='Maximum interval (in seconds) for agent heartbeats.'),
]
CONF = cfg.CONF
CONF.import_opt('my_ip', 'ironic.netconf')
CONF.register_opts(agent_opts, group='agent')
LOG = log.getLogger(__name__)
def _time():
"""Broken out for testing."""
return time.time()
def _get_client():
client = agent_client.AgentClient()
return client
def _build_pxe_config_options(pxe_info):
ironic_api = (CONF.conductor.api_url or
keystone.get_service_url()).rstrip('/')
return {
'deployment_aki_path': pxe_info['deploy_kernel'][1],
'deployment_ari_path': pxe_info['deploy_ramdisk'][1],
'pxe_append_params': CONF.agent.agent_pxe_append_params,
'ipa_api_url': ironic_api,
}
def _get_tftp_image_info(node):
return pxe_utils.get_deploy_kr_info(node.uuid, node.driver_info)
def _set_failed_state(task, msg):
"""Set a node's error state and provision state to signal Nova.
When deploy steps aren't called by explicitly the conductor, but are
the result of callbacks, we need to set the node's state explicitly.
This tells Nova to change the instance's status so the user can see
their deploy/tear down had an issue and makes debugging/deleting Nova
instances easier.
"""
node = task.node
node.provision_state = states.DEPLOYFAIL
node.target_provision_state = states.NOSTATE
node.save(task.context)
try:
manager_utils.node_power_action(task, states.POWER_OFF)
except Exception:
msg = (_('Node %s failed to power off while handling deploy '
'failure. This may be a serious condition. Node '
'should be removed from Ironic or put in maintenance '
'mode until the problem is resolved.') % node.uuid)
LOG.error(msg)
finally:
# NOTE(deva): node_power_action() erases node.last_error
# so we need to set it again here.
node.last_error = msg
node.save(task.context)
class AgentTFTPImageCache(image_cache.ImageCache):
def __init__(self, image_service=None):
super(AgentTFTPImageCache, self).__init__(
CONF.pxe.tftp_master_path,
# MiB -> B
CONF.pxe.image_cache_size * 1024 * 1024,
# min -> sec
CONF.pxe.image_cache_ttl * 60,
image_service=image_service)
# copied from pxe driver - should be refactored per LP1350594
def _free_disk_space_for(path):
"""Get free disk space on a drive where path is located."""
stat = os.statvfs(path)
return stat.f_frsize * stat.f_bavail
# copied from pxe driver - should be refactored per LP1350594
def _cleanup_caches_if_required(ctx, cache, images_info):
# NOTE(dtantsur): I'd prefer to have this code inside ImageCache. But:
# To reclaim disk space efficiently, this code needs to be aware of
# all existing caches (e.g. cleaning instance image cache can be
# much more efficient, than cleaning TFTP cache).
total_size = sum(images.download_size(ctx, uuid)
for (uuid, path) in images_info)
free = _free_disk_space_for(cache.master_dir)
if total_size >= free:
# NOTE(dtantsur): instance cache is larger - always clean it first
# NOTE(dtantsur): filter caches, whose directory is on the same device
st_dev = os.stat(cache.master_dir).st_dev
caches = [c for c in (AgentTFTPImageCache(),)
if os.stat(c.master_dir).st_dev == st_dev]
for cache_to_clean in caches:
cache_to_clean.clean_up()
free = _free_disk_space_for(cache.master_dir)
if total_size < free:
break
else:
msg = _("Disk volume where '%(path)s' is located doesn't have "
"enough disk space. Required %(required)d MiB, "
"only %(actual)d MiB available space present.")
raise exception.InstanceDeployFailure(reason=msg % {
'path': cache.master_dir,
'required': total_size / 1024 / 1024,
'actual': free / 1024 / 1024
})
# copied from pxe driver - should be refactored per LP1350594
def _fetch_images(ctx, cache, images_info):
"""Check for available disk space and fetch images using ImageCache.
:param ctx: context
:param cache: ImageCache instance to use for fetching
:param images_info: list of tuples (image uuid, destination path)
:raises: InstanceDeployFailure if unable to find enough disk space
"""
_cleanup_caches_if_required(ctx, cache, images_info)
# NOTE(dtantsur): This code can suffer from race condition,
# if disk space is used between the check and actual download.
# This is probably unavoidable, as we can't control other
# (probably unrelated) processes
for uuid, path in images_info:
cache.fetch_image(uuid, path, ctx=ctx)
# copied from pxe driver - should be refactored per LP1350594
def _cache_tftp_images(ctx, node, pxe_info):
"""Fetch the necessary kernels and ramdisks for the instance."""
fileutils.ensure_tree(
os.path.join(CONF.pxe.tftp_root, node.uuid))
LOG.debug("Fetching kernel and ramdisk for node %s",
node.uuid)
_fetch_images(ctx, AgentTFTPImageCache(), pxe_info.values())
def _build_instance_info_for_deploy(task):
"""Build instance_info necessary for deploying to a node."""
node = task.node
instance_info = node.instance_info
glance = image_service.Service(version=2, context=task.context)
image_info = glance.show(instance_info['image_source'])
swift_temp_url = glance.swift_temp_url(image_info)
LOG.debug('Got image info: %(info)s for node %(node)s.',
{'info': image_info, 'node': node.uuid})
instance_info['image_url'] = swift_temp_url
instance_info['image_checksum'] = image_info['checksum']
return instance_info
class AgentDeploy(base.DeployInterface):
"""Interface for deploy-related actions."""
def get_properties(self):
"""Return the properties of the interface.
:returns: dictionary of <property name>:<property description> entries.
"""
return {}
def validate(self, task):
"""Validate the driver-specific Node deployment info.
This method validates whether the 'instance_info' property of the
supplied node contains the required information for this driver to
deploy images to the node.
:param task: a TaskManager instance
:raises: InvalidParameterValue
"""
try:
_get_tftp_image_info(task.node)
except KeyError:
raise exception.InvalidParameterValue(_(
'Node %s failed to validate deploy image info'),
task.node.uuid)
@task_manager.require_exclusive_lock
def deploy(self, task):
"""Perform a deployment to a node.
Perform the necessary work to deploy an image onto the specified node.
This method will be called after prepare(), which may have already
performed any preparatory steps, such as pre-caching some data for the
node.
:param task: a TaskManager instance.
:returns: status of the deploy. One of ironic.common.states.
"""
dhcp_opts = pxe_utils.dhcp_options_for_instance()
neutron.update_neutron(task, dhcp_opts)
manager_utils.node_set_boot_device(task, 'pxe', persistent=True)
manager_utils.node_power_action(task, states.REBOOT)
return states.DEPLOYWAIT
@task_manager.require_exclusive_lock
def tear_down(self, task):
"""Tear down a previous deployment on the task's node.
:param task: a TaskManager instance.
:returns: status of the deploy. One of ironic.common.states.
"""
manager_utils.node_power_action(task, states.POWER_OFF)
return states.DELETED
def prepare(self, task):
"""Prepare the deployment environment for this node.
:param task: a TaskManager instance.
"""
node = task.node
pxe_info = _get_tftp_image_info(task.node)
pxe_options = _build_pxe_config_options(pxe_info)
pxe_utils.create_pxe_config(task,
pxe_options,
CONF.agent.agent_pxe_config_template)
_cache_tftp_images(task.context, node, pxe_info)
node.instance_info = _build_instance_info_for_deploy(task)
node.save(task.context)
def clean_up(self, task):
"""Clean up the deployment environment for this node.
If preparation of the deployment environment ahead of time is possible,
this method should be implemented by the driver. It should erase
anything cached by the `prepare` method.
If implemented, this method must be idempotent. It may be called
multiple times for the same node on the same conductor, and it may be
called by multiple conductors in parallel. Therefore, it must not
require an exclusive lock.
This method is called before `tear_down`.
:param task: a TaskManager instance.
"""
pxe_info = _get_tftp_image_info(task.node)
for label in pxe_info:
path = pxe_info[label][1]
utils.unlink_without_raise(path)
AgentTFTPImageCache().clean_up()
pxe_utils.clean_up_pxe_config(task)
def take_over(self, task):
"""Take over management of this node from a dead conductor.
If conductors' hosts maintain a static relationship to nodes, this
method should be implemented by the driver to allow conductors to
perform the necessary work during the remapping of nodes to conductors
when a conductor joins or leaves the cluster.
For example, the PXE driver has an external dependency:
Neutron must forward DHCP BOOT requests to a conductor which has
prepared the tftpboot environment for the given node. When a
conductor goes offline, another conductor must change this setting
in Neutron as part of remapping that node's control to itself.
This is performed within the `takeover` method.
:param task: a TaskManager instance.
"""
neutron.update_neutron(task, CONF.agent.agent_pxe_bootfile_name)
class AgentVendorInterface(base.VendorInterface):
def __init__(self):
self.vendor_routes = {
'heartbeat': self._heartbeat
}
self.driver_routes = {
'lookup': self._lookup,
}
self.supported_payload_versions = ['2']
self.dbapi = dbapi.get_instance()
self._client = _get_client()
def get_properties(self):
"""Return the properties of the interface.
:returns: dictionary of <property name>:<property description> entries.
"""
# NOTE(jroll) all properties are set by the driver,
# not by the operator.
return {}
def validate(self, task, **kwargs):
"""Validate the driver-specific Node deployment info.
No validation necessary.
:param task: a TaskManager instance
"""
pass
def driver_vendor_passthru(self, task, method, **kwargs):
"""A node that does not know its UUID should POST to this method.
Given method, route the command to the appropriate private function.
"""
if method not in self.driver_routes:
raise exception.InvalidParameterValue(_('No handler for method %s')
% method)
func = self.driver_routes[method]
return func(task, **kwargs)
def vendor_passthru(self, task, **kwargs):
"""A node that knows its UUID should heartbeat to this passthru.
It will get its node object back, with what Ironic thinks its provision
state is and the target provision state is.
"""
method = kwargs['method'] # Existence checked in mixin
if method not in self.vendor_routes:
raise exception.InvalidParameterValue(_('No handler for method '
'%s') % method)
func = self.vendor_routes[method]
try:
return func(task, **kwargs)
except Exception:
# catch-all in case something bubbles up here
with excutils.save_and_reraise_exception():
LOG.exception(_('vendor_passthru failed with method %s'),
method)
def _heartbeat(self, task, **kwargs):
"""Method for agent to periodically check in.
The agent should be sending its agent_url (so Ironic can talk back)
as a kwarg.
kwargs should have the following format:
{
'agent_url': 'http://AGENT_HOST:AGENT_PORT'
}
AGENT_PORT defaults to 9999.
"""
node = task.node
driver_info = node.driver_info
LOG.debug(
'Heartbeat from %(node)s, last heartbeat at %(heartbeat)s.',
{'node': node.uuid,
'heartbeat': driver_info.get('agent_last_heartbeat')})
driver_info['agent_last_heartbeat'] = int(_time())
driver_info['agent_url'] = kwargs['agent_url']
node.driver_info = driver_info
node.save(task.context)
# Async call backs don't set error state on their own
# TODO(jimrollenhagen) improve error messages here
try:
if node.provision_state == states.DEPLOYWAIT:
msg = _('Node failed to get image for deploy.')
self._continue_deploy(task, **kwargs)
elif (node.provision_state == states.DEPLOYING
and self._deploy_is_done(node)):
msg = _('Node failed to move to active state.')
self._reboot_to_instance(task, **kwargs)
except Exception:
LOG.exception('Async exception for %(node)s: %(msg)s',
{'node': node,
'msg': msg})
_set_failed_state(task, msg)
def _deploy_is_done(self, node):
return self._client.deploy_is_done(node)
@task_manager.require_exclusive_lock
def _continue_deploy(self, task, **kwargs):
node = task.node
image_source = node.instance_info.get('image_source')
LOG.debug('Continuing deploy for %s', node.uuid)
image_info = {
'id': image_source,
'urls': [node.instance_info['image_url']],
'checksum': node.instance_info['image_checksum'],
}
# Tell the client to download and write the image with the given args
res = self._client.prepare_image(node, image_info)
LOG.debug('prepare_image got response %(res)s for node %(node)s',
{'res': res, 'node': node.uuid})
node.provision_state = states.DEPLOYING
node.save(task.context)
def _check_deploy_success(self, node):
# should only ever be called after we've validated that
# the prepare_image command is complete
command = self._client.get_commands_status(node)[-1]
if command['command_status'] == 'FAILED':
return command['command_error']
def _reboot_to_instance(self, task, **kwargs):
node = task.node
LOG.debug('Preparing to reboot to instance for node %s',
node.uuid)
error = self._check_deploy_success(node)
if error is not None:
# TODO(jimrollenhagen) power off if using neutron dhcp to
# align with pxe driver?
msg = _('node %(node)s command status errored: %(error)s') % (
{'node': node.uuid, 'error': error})
LOG.error(msg)
_set_failed_state(task, msg)
return
LOG.debug('Rebooting node %s to disk', node.uuid)
manager_utils.node_set_boot_device(task, 'disk', persistent=True)
manager_utils.node_power_action(task, states.REBOOT)
node.provision_state = states.ACTIVE
node.target_provision_state = states.NOSTATE
node.save(task.context)
def _lookup(self, context, **kwargs):
"""Method to be called the first time a ramdisk agent checks in. This
can be because this is a node just entering decom or a node that
rebooted for some reason. We will use the mac addresses listed in the
kwargs to find the matching node, then return the node object to the
agent. The agent can that use that UUID to use the normal vendor
passthru method.
Currently, we don't handle the instance where the agent doesn't have
a matching node (i.e. a brand new, never been in Ironic node).
kwargs should have the following format:
{
"version": "2"
"inventory": {
"interfaces": [
{
"name": "eth0",
"mac_address": "00:11:22:33:44:55",
"switch_port_descr": "port24"
"switch_chassis_descr": "tor1"
},
...
], ...
}
}
The interfaces list should include a list of the non-IPMI MAC addresses
in the form aa:bb:cc:dd:ee:ff.
This method will also return the timeout for heartbeats. The driver
will expect the agent to heartbeat before that timeout, or it will be
considered down. This will be in a root level key called
'heartbeat_timeout'
:raises: NotFound if no matching node is found.
:raises: InvalidParameterValue with unknown payload version
"""
version = kwargs.get('version')
if version not in self.supported_payload_versions:
raise exception.InvalidParameterValue(_('Unknown lookup payload'
'version: %s') % version)
interfaces = self._get_interfaces(version, kwargs)
mac_addresses = self._get_mac_addresses(interfaces)
node = self._find_node_by_macs(context, mac_addresses)
LOG.debug('Initial lookup for node %s succeeded.', node.uuid)
# Only support additional hardware in v2 and above. Grab all the
# top level keys in inventory that aren't interfaces and add them.
# Nest it in 'hardware' to avoid namespace issues
hardware = {
'hardware': {
'network': interfaces
}
}
for key, value in kwargs.items():
if key != 'interfaces':
hardware['hardware'][key] = value
return {
'heartbeat_timeout': CONF.agent.heartbeat_timeout,
'node': node
}
def _get_interfaces(self, version, inventory):
interfaces = []
try:
interfaces = inventory['inventory']['interfaces']
except (KeyError, TypeError):
raise exception.InvalidParameterValue(_(
'Malformed network interfaces lookup: %s') % inventory)
return interfaces
def _get_mac_addresses(self, interfaces):
"""Returns MACs for the network devices
"""
mac_addresses = []
for interface in interfaces:
try:
mac_addresses.append(utils.validate_and_normalize_mac(
interface.get('mac_address')))
except exception.InvalidMAC:
LOG.warning(_LW('Malformed MAC: %s'), interface.get(
'mac_address'))
return mac_addresses
def _find_node_by_macs(self, context, mac_addresses):
"""Given a list of MAC addresses, find the ports that match the MACs
and return the node they are all connected to.
:raises: NodeNotFound if the ports point to multiple nodes or no
nodes.
"""
ports = self._find_ports_by_macs(context, mac_addresses)
if not ports:
raise exception.NodeNotFound(_(
'No ports matching the given MAC addresses %sexist in the '
'database.') % mac_addresses)
node_id = self._get_node_id(ports)
try:
node = node_module.Node.get_by_id(context, node_id)
except exception.NodeNotFound:
with excutils.save_and_reraise_exception():
LOG.exception(_('Could not find matching node for the '
'provided MACs %s.'), mac_addresses)
return node
def _find_ports_by_macs(self, context, mac_addresses):
"""Given a list of MAC addresses, find the ports that match the MACs
and return them as a list of Port objects, or an empty list if there
are no matches
"""
ports = []
for mac in mac_addresses:
# Will do a search by mac if the mac isn't malformed
try:
# TODO(JoshNang) add port.get_by_mac() to Ironic
# port.get_by_uuid() would technically work but shouldn't.
port_ob = self.dbapi.get_port(port_id=mac)
ports.append(port_ob)
except exception.PortNotFound:
LOG.warning(_LW('MAC address %s not found in database'), mac)
return ports
def _get_node_id(self, ports):
"""Given a list of ports, either return the node_id they all share or
raise a NotFound if there are multiple node_ids, which indicates some
ports are connected to one node and the remaining port(s) are connected
to one or more other nodes.
:raises: NodeNotFound if the MACs match multiple nodes. This
could happen if you swapped a NIC from one server to another and
don't notify Ironic about it or there is a MAC collision (since
they're not guaranteed to be unique).
"""
# See if all the ports point to the same node
node_ids = set(port_ob.node_id for port_ob in ports)
if len(node_ids) > 1:
raise exception.NodeNotFound(_(
'Ports matching mac addresses match multiple nodes. MACs: '
'%(macs)s. Port ids: %(port_ids)s') %
{'macs': [port_ob.address for port_ob in ports], 'port_ids':
[port_ob.uuid for port_ob in ports]}
)
# Only have one node_id left, return it.
return node_ids.pop()

View File

@ -0,0 +1,104 @@
# Copyright 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
import requests
from ironic.common import exception
from ironic.openstack.common import jsonutils
from ironic.openstack.common import log
agent_opts = [
cfg.StrOpt('agent_api_version',
default='v1',
help='API version to use for communicating with the ramdisk '
'agent.')
]
CONF = cfg.CONF
CONF.register_opts(agent_opts, group='agent')
LOG = log.getLogger(__name__)
class AgentClient(object):
"""Client for interacting with nodes via a REST API."""
def __init__(self):
self.session = requests.Session()
def _get_command_url(self, node):
if 'agent_url' not in node.driver_info:
raise exception.IronicException(_('Agent driver requires '
'agent_url in driver_info'))
return ('%(agent_url)s/%(api_version)s/commands' %
{'agent_url': node.driver_info['agent_url'],
'api_version': CONF.agent.agent_api_version})
def _get_command_body(self, method, params):
return jsonutils.dumps({
'name': method,
'params': params,
})
def _command(self, node, method, params, wait=False):
url = self._get_command_url(node)
body = self._get_command_body(method, params)
request_params = {
'wait': str(wait).lower()
}
headers = {
'Content-Type': 'application/json'
}
response = self.session.post(url,
params=request_params,
data=body,
headers=headers)
# TODO(russellhaering): real error handling
return response.json()
def get_commands_status(self, node):
url = self._get_command_url(node)
headers = {'Content-Type': 'application/json'}
res = self.session.get(url, headers=headers)
return res.json()['commands']
def deploy_is_done(self, node):
commands = self.get_commands_status(node)
if not commands:
return False
last_command = commands[-1]
if last_command['command_name'] != 'prepare_image':
# catches race condition where prepare_image is still processing
# so deploy hasn't started yet
return False
if last_command['command_status'] != 'RUNNING':
return True
return False
def prepare_image(self, node, image_info, wait=False):
"""Call the `prepare_image` method on the node."""
LOG.debug('Preparing image %(image)s on node %(node)s.',
{'image': image_info.get('id'),
'node': self._get_command_url(node)})
return self._command(node=node,
method='standby.prepare_image',
params={
'image_info': image_info,
},
wait=wait)

View File

@ -0,0 +1,5 @@
default deploy
label deploy
kernel {{ pxe_options.deployment_aki_path }}
append initrd={{ pxe_options.deployment_ari_path }} {{ pxe_options.pxe_append_params }} {% if pxe_options.ipa_api_url %}ipa-api-url={{ pxe_options.ipa_api_url }}{% endif %} {% if pxe_options.ipa_advertise_host %}ipa-advertise-host={{ pxe_options.ipa_advertise_host }}{% endif %}

View File

@ -36,7 +36,7 @@ def create(node):
:returns: GenericDriverFields or a subclass thereof, as appropriate
for the supplied node.
"""
if 'pxe' in node.driver:
if 'pxe' in node.driver or 'agent' in node.driver:
return PXEDriverFields(node)
else:
return GenericDriverFields(node)

View File

@ -78,6 +78,22 @@ def get_test_ilo_info():
}
def get_test_agent_instance_info():
return {
'image_source': 'fake-image',
'image_url': 'http://image',
'image_checksum': 'checksum'
}
def get_test_agent_driver_info():
return {
'agent_url': 'http://127.0.0.1/foo',
'deploy_kernel': 'glance://deploy_kernel_uuid',
'deploy_ramdisk': 'glance://deploy_ramdisk_uuid',
}
def get_test_node(**kw):
properties = {
"cpu_arch": "x86_64",

View File

@ -0,0 +1,5 @@
default deploy
label deploy
kernel {{ pxe_options.deployment_aki_path }}
append initrd={{ pxe_options.deployment_ari_path }} root=squashfs: {% if pxe_options.pxe_append_params %}{{ pxe_options.pxe_append_params }}{% endif %} state=tmpfs: ipa-api-url={{ pxe_options.ipa_api_url }} {% if pxe_options.ipa_advertise_host %}ipa-advertise-host={{ pxe_options.ipa_advertise_host }}{% endif %}

View File

@ -0,0 +1,306 @@
# Copyright 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo.config import cfg
from ironic.common import exception
from ironic.common import neutron
from ironic.common import pxe_utils
from ironic.common import states
from ironic.conductor import task_manager
from ironic.db import api as dbapi
from ironic.drivers.modules import agent
from ironic.openstack.common import context
from ironic.tests.conductor import utils as mgr_utils
from ironic.tests.db import base as db_base
from ironic.tests.db import utils as db_utils
from ironic.tests.objects import utils as object_utils
INSTANCE_INFO = db_utils.get_test_agent_instance_info()
DRIVER_INFO = db_utils.get_test_agent_driver_info()
CONF = cfg.CONF
class TestAgentDeploy(db_base.DbTestCase):
def setUp(self):
super(TestAgentDeploy, self).setUp()
mgr_utils.mock_the_extension_manager(driver='fake_agent')
self.dbapi = dbapi.get_instance()
self.driver = agent.AgentDeploy()
self.context = context.get_admin_context()
n = {
'driver': 'fake_agent',
'instance_info': INSTANCE_INFO,
'driver_info': DRIVER_INFO
}
self.node = object_utils.create_test_node(self.context, **n)
def _create_test_port(self, **kwargs):
p = db_utils.get_test_port(**kwargs)
return self.dbapi.create_port(p)
def test_validate(self):
with task_manager.acquire(
self.context, self.node['uuid'], shared=False) as task:
self.driver.validate(task)
@mock.patch.object(neutron, 'update_neutron')
@mock.patch('ironic.conductor.utils.node_set_boot_device')
@mock.patch('ironic.conductor.utils.node_power_action')
def test_deploy(self, power_mock, bootdev_mock, neutron_mock):
dhcp_opts = pxe_utils.dhcp_options_for_instance()
with task_manager.acquire(
self.context, self.node['uuid'], shared=False) as task:
driver_return = self.driver.deploy(task)
self.assertEqual(driver_return, states.DEPLOYWAIT)
neutron_mock.assert_called_once_with(task, dhcp_opts)
bootdev_mock.assert_called_once_with(task, 'pxe', persistent=True)
power_mock.assert_called_once_with(task,
states.REBOOT)
@mock.patch('ironic.conductor.utils.node_power_action')
def test_tear_down(self, power_mock):
with task_manager.acquire(
self.context, self.node['uuid'], shared=False) as task:
driver_return = self.driver.tear_down(task)
power_mock.assert_called_once_with(task, states.POWER_OFF)
self.assertEqual(driver_return, states.DELETED)
def test_prepare(self):
pass
def test_clean_up(self):
pass
@mock.patch.object(neutron, 'update_neutron')
def test_take_over(self, update_neutron_mock):
with task_manager.acquire(
self.context, self.node['uuid'], shared=True) as task:
task.driver.deploy.take_over(task)
update_neutron_mock.assert_called_once_with(
task, CONF.agent.agent_pxe_bootfile_name)
class TestAgentVendor(db_base.DbTestCase):
def setUp(self):
super(TestAgentVendor, self).setUp()
mgr_utils.mock_the_extension_manager(driver="fake_pxe")
self.dbapi = dbapi.get_instance()
self.passthru = agent.AgentVendorInterface()
self.passthru.db_connection = mock.Mock(autospec=True)
self.context = context.get_admin_context()
n = {
'driver': 'fake_pxe',
'instance_info': INSTANCE_INFO,
'driver_info': DRIVER_INFO
}
self.node = object_utils.create_test_node(self.context, **n)
def _create_test_port(self, **kwargs):
p = db_utils.get_test_port(**kwargs)
return self.dbapi.create_port(p)
def test_validate(self):
with task_manager.acquire(self.context, self.node.uuid) as task:
self.passthru.validate(task)
@mock.patch('ironic.common.image_service.Service')
def test_continue_deploy(self, image_service_mock):
test_temp_url = 'http://image'
expected_image_info = {
'urls': [test_temp_url],
'id': 'fake-image',
'checksum': 'checksum'
}
client_mock = mock.Mock()
glance_mock = mock.Mock()
glance_mock.show.return_value = {}
glance_mock.swift_temp_url.return_value = test_temp_url
image_service_mock.return_value = glance_mock
self.passthru._client = client_mock
with task_manager.acquire(self.context, self.node.uuid,
shared=False) as task:
self.passthru._continue_deploy(task)
client_mock.prepare_image.assert_called_with(task.node,
expected_image_info)
self.assertEqual(task.node.provision_state, states.DEPLOYING)
def test_lookup_version_not_found(self):
kwargs = {
'version': '999',
}
with task_manager.acquire(self.context, self.node.uuid) as task:
self.assertRaises(exception.InvalidParameterValue,
self.passthru._lookup,
task.context,
**kwargs)
@mock.patch('ironic.drivers.modules.agent.AgentVendorInterface'
'._find_node_by_macs')
def test_lookup_v2(self, find_mock):
kwargs = {
'version': '2',
'inventory': {
'interfaces': [
{
'mac_address': 'aa:bb:cc:dd:ee:ff',
'name': 'eth0'
},
{
'mac_address': 'ff:ee:dd:cc:bb:aa',
'name': 'eth1'
}
]
}
}
find_mock.return_value = self.node
with task_manager.acquire(self.context, self.node.uuid) as task:
node = self.passthru._lookup(task.context, **kwargs)
self.assertEqual(self.node, node['node'])
def test_lookup_v2_missing_inventory(self):
with task_manager.acquire(self.context, self.node.uuid) as task:
self.assertRaises(exception.InvalidParameterValue,
self.passthru._lookup,
task.context)
def test_lookup_v2_empty_inventory(self):
with task_manager.acquire(self.context, self.node.uuid) as task:
self.assertRaises(exception.InvalidParameterValue,
self.passthru._lookup,
task.context,
inventory={})
def test_lookup_v2_empty_interfaces(self):
with task_manager.acquire(self.context, self.node.uuid) as task:
self.assertRaises(exception.NodeNotFound,
self.passthru._lookup,
task.context,
version='2',
inventory={'interfaces': []})
def test_find_ports_by_macs(self):
fake_port = self._create_test_port()
macs = ['aa:bb:cc:dd:ee:ff']
self.passthru.dbapi = mock.Mock()
self.passthru.dbapi.get_port.return_value = fake_port
with task_manager.acquire(
self.context, self.node['uuid'], shared=True) as task:
ports = self.passthru._find_ports_by_macs(task, macs)
self.assertEqual(1, len(ports))
self.assertEqual(fake_port.uuid, ports[0].uuid)
self.assertEqual(fake_port.node_id, ports[0].node_id)
def test_find_ports_by_macs_bad_params(self):
self.passthru.dbapi = mock.Mock()
self.passthru.dbapi.get_port.side_effect = exception.PortNotFound(
port="123")
macs = ['aa:bb:cc:dd:ee:ff']
with task_manager.acquire(
self.context, self.node['uuid'], shared=True) as task:
empty_ids = self.passthru._find_ports_by_macs(task, macs)
self.assertEqual([], empty_ids)
@mock.patch('ironic.objects.node.Node.get_by_id')
@mock.patch('ironic.drivers.modules.agent.AgentVendorInterface'
'._get_node_id')
@mock.patch('ironic.drivers.modules.agent.AgentVendorInterface'
'._find_ports_by_macs')
def test_find_node_by_macs(self, ports_mock, node_id_mock, node_mock):
ports_mock.return_value = [self._create_test_port()]
node_id_mock.return_value = '1'
node_mock.return_value = self.node
macs = ['aa:bb:cc:dd:ee:ff']
with task_manager.acquire(
self.context, self.node['uuid'], shared=True) as task:
node = self.passthru._find_node_by_macs(task, macs)
self.assertEqual(node, node)
@mock.patch('ironic.drivers.modules.agent.AgentVendorInterface'
'._find_ports_by_macs')
def test_find_node_by_macs_no_ports(self, ports_mock):
ports_mock.return_value = []
macs = ['aa:bb:cc:dd:ee:ff']
with task_manager.acquire(
self.context, self.node['uuid'], shared=True) as task:
self.assertRaises(exception.NodeNotFound,
self.passthru._find_node_by_macs,
task,
macs)
@mock.patch('ironic.objects.node.Node.get_by_uuid')
@mock.patch('ironic.drivers.modules.agent.AgentVendorInterface'
'._get_node_id')
@mock.patch('ironic.drivers.modules.agent.AgentVendorInterface'
'._find_ports_by_macs')
def test_find_node_by_macs_nodenotfound(self, ports_mock, node_id_mock,
node_mock):
port = self._create_test_port()
ports_mock.return_value = [port]
node_id_mock.return_value = self.node['uuid']
node_mock.side_effect = [self.node,
exception.NodeNotFound(node=self.node)]
macs = ['aa:bb:cc:dd:ee:ff']
with task_manager.acquire(
self.context, self.node['uuid'], shared=True) as task:
self.assertRaises(exception.NodeNotFound,
self.passthru._find_node_by_macs,
task,
macs)
def test_get_node_id(self):
fake_port1 = self._create_test_port(node_id=123,
address="aa:bb:cc:dd:ee:fe")
fake_port2 = self._create_test_port(node_id=123,
id=42,
address="aa:bb:cc:dd:ee:fb",
uuid='1be26c0b-03f2-4d2e-ae87-c02'
'd7f33c782')
node_id = self.passthru._get_node_id([fake_port1, fake_port2])
self.assertEqual(fake_port2.node_id, node_id)
def test_get_node_id_exception(self):
fake_port1 = self._create_test_port(node_id=123,
address="aa:bb:cc:dd:ee:fc")
fake_port2 = self._create_test_port(node_id=321,
id=42,
address="aa:bb:cc:dd:ee:fd",
uuid='1be26c0b-03f2-4d2e-ae87-c02'
'd7f33c782')
self.assertRaises(exception.NodeNotFound,
self.passthru._get_node_id,
[fake_port1, fake_port2])
def test_heartbeat(self):
kwargs = {
'agent_url': 'http://127.0.0.1:9999/bar'
}
with task_manager.acquire(
self.context, self.node['uuid'], shared=True) as task:
self.passthru._heartbeat(task, **kwargs)

View File

@ -0,0 +1,133 @@
# Copyright 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import requests
import mock
from ironic.common import exception
from ironic.drivers.modules import agent_client
from ironic.tests import base
class MockResponse(object):
def __init__(self, data):
self.data = data
self.text = json.dumps(data)
def json(self):
return self.data
class MockNode(object):
def __init__(self):
self.driver_info = {
'agent_url': "http://127.0.0.1:9999"
}
class TestAgentClient(base.TestCase):
def setUp(self):
super(TestAgentClient, self).setUp()
self.client = agent_client.AgentClient()
self.client.session = mock.Mock(autospec=requests.Session)
self.node = MockNode()
def test__get_command_url(self):
command_url = self.client._get_command_url(self.node)
expected = self.node.driver_info['agent_url'] + '/v1/commands'
self.assertEqual(expected, command_url)
def test__get_command_url_fail(self):
del self.node.driver_info['agent_url']
self.assertRaises(exception.IronicException,
self.client._get_command_url,
self.node)
def test__get_command_body(self):
expected = json.dumps({'name': 'prepare_image', 'params': {}})
self.assertEqual(expected,
self.client._get_command_body('prepare_image', {}))
def test__command(self):
response_data = {'status': 'ok'}
self.client.session.post.return_value = MockResponse(response_data)
method = 'standby.run_image'
image_info = {'image_id': 'test_image'}
params = {'image_info': image_info}
url = self.client._get_command_url(self.node)
body = self.client._get_command_body(method, params)
headers = {'Content-Type': 'application/json'}
response = self.client._command(self.node, method, params)
self.assertEqual(response, response_data)
self.client.session.post.assert_called_once_with(
url,
data=body,
headers=headers,
params={'wait': 'false'})
def test_get_commands_status(self):
with mock.patch.object(self.client.session, 'get') as mock_get:
res = mock.Mock()
res.json.return_value = {'commands': []}
mock_get.return_value = res
self.assertEqual([], self.client.get_commands_status(self.node))
def test_deploy_is_done(self):
with mock.patch.object(self.client, 'get_commands_status') as mock_s:
mock_s.return_value = [{
'command_name': 'prepare_image',
'command_status': 'SUCCESS'
}]
self.assertTrue(self.client.deploy_is_done(self.node))
def test_deploy_is_done_empty_response(self):
with mock.patch.object(self.client, 'get_commands_status') as mock_s:
mock_s.return_value = []
self.assertFalse(self.client.deploy_is_done(self.node))
def test_deploy_is_done_race(self):
with mock.patch.object(self.client, 'get_commands_status') as mock_s:
mock_s.return_value = [{
'command_name': 'some_other_command',
'command_status': 'SUCCESS'
}]
self.assertFalse(self.client.deploy_is_done(self.node))
def test_deploy_is_done_still_running(self):
with mock.patch.object(self.client, 'get_commands_status') as mock_s:
mock_s.return_value = [{
'command_name': 'prepare_image',
'command_status': 'RUNNING'
}]
self.assertFalse(self.client.deploy_is_done(self.node))
@mock.patch('uuid.uuid4', mock.MagicMock(return_value='uuid'))
def test_prepare_image(self):
self.client._command = mock.Mock()
image_info = {'image_id': 'image'}
params = {
'image_info': image_info,
}
self.client.prepare_image(self.node,
image_info,
wait=False)
self.client._command.assert_called_once_with(node=self.node,
method='standby.prepare_image',
params=params,
wait=False)

View File

@ -30,7 +30,11 @@ console_scripts =
ironic-rootwrap = oslo.rootwrap.cmd:main
ironic.drivers =
agent_ipmitool = ironic.drivers.agent:AgentAndIPMIToolDriver
agent_pyghmi = ironic.drivers.agent:AgentAndIPMINativeDriver
agent_ssh = ironic.drivers.agent:AgentAndSSHDriver
fake = ironic.drivers.fake:FakeDriver
fake_agent = ironic.drivers.fake:FakeAgentDriver
fake_ipmitool = ironic.drivers.fake:FakeIPMIToolDriver
fake_ipminative = ironic.drivers.fake:FakeIPMINativeDriver
fake_ssh = ironic.drivers.fake:FakeSSHDriver