Browse Source

Remove ironic_tempest_plugin/ directory

We now use the project openstack/ironic-tempest-plugin to store our
tempest plugin. All content from the ironic_tempest_plugin/ directory
has been ported to that project.

We no longer want to have the plugin content stored here so we delete
it.

Remove check in tools/flake8wrap.sh that prevented changes to the
ironic_tempest_plugin/ directory.

Change-Id: I700bd7b71472fa91f6bc02aebc055584df08e0ef
changes/33/527733/15
John L. Villalovos 4 years ago
parent
commit
7e1287f243
  1. 3
      doc/source/conf.py
  2. 5
      ironic_tempest_plugin/PLUGIN-MOVED
  3. 15
      ironic_tempest_plugin/README.rst
  4. 0
      ironic_tempest_plugin/__init__.py
  5. 53
      ironic_tempest_plugin/clients.py
  6. 0
      ironic_tempest_plugin/common/__init__.py
  7. 33
      ironic_tempest_plugin/common/utils.py
  8. 112
      ironic_tempest_plugin/common/waiters.py
  9. 116
      ironic_tempest_plugin/config.py
  10. 559
      ironic_tempest_plugin/manager.py
  11. 46
      ironic_tempest_plugin/plugin.py
  12. 0
      ironic_tempest_plugin/services/__init__.py
  13. 0
      ironic_tempest_plugin/services/baremetal/__init__.py
  14. 264
      ironic_tempest_plugin/services/baremetal/base.py
  15. 0
      ironic_tempest_plugin/services/baremetal/v1/__init__.py
  16. 0
      ironic_tempest_plugin/services/baremetal/v1/json/__init__.py
  17. 641
      ironic_tempest_plugin/services/baremetal/v1/json/baremetal_client.py
  18. 0
      ironic_tempest_plugin/tests/__init__.py
  19. 0
      ironic_tempest_plugin/tests/api/__init__.py
  20. 0
      ironic_tempest_plugin/tests/api/admin/__init__.py
  21. 26
      ironic_tempest_plugin/tests/api/admin/api_microversion_fixture.py
  22. 351
      ironic_tempest_plugin/tests/api/admin/base.py
  23. 43
      ironic_tempest_plugin/tests/api/admin/test_api_discovery.py
  24. 83
      ironic_tempest_plugin/tests/api/admin/test_chassis.py
  25. 54
      ironic_tempest_plugin/tests/api/admin/test_drivers.py
  26. 403
      ironic_tempest_plugin/tests/api/admin/test_nodes.py
  27. 193
      ironic_tempest_plugin/tests/api/admin/test_nodestates.py
  28. 74
      ironic_tempest_plugin/tests/api/admin/test_portgroups.py
  29. 376
      ironic_tempest_plugin/tests/api/admin/test_ports.py
  30. 463
      ironic_tempest_plugin/tests/api/admin/test_ports_negative.py
  31. 227
      ironic_tempest_plugin/tests/api/admin/test_volume_connector.py
  32. 210
      ironic_tempest_plugin/tests/api/admin/test_volume_target.py
  33. 0
      ironic_tempest_plugin/tests/scenario/__init__.py
  34. 231
      ironic_tempest_plugin/tests/scenario/baremetal_manager.py
  35. 339
      ironic_tempest_plugin/tests/scenario/baremetal_standalone_manager.py
  36. 0
      ironic_tempest_plugin/tests/scenario/ironic_standalone/__init__.py
  37. 145
      ironic_tempest_plugin/tests/scenario/ironic_standalone/test_basic_ops.py
  38. 146
      ironic_tempest_plugin/tests/scenario/test_baremetal_basic_ops.py
  39. 152
      ironic_tempest_plugin/tests/scenario/test_baremetal_boot_from_volume.py
  40. 153
      ironic_tempest_plugin/tests/scenario/test_baremetal_multitenancy.py
  41. 9
      releasenotes/notes/tempest_plugin_removal-009f9ce8456b16fe.yaml
  42. 5
      setup.cfg
  43. 18
      tools/flake8wrap.sh
  44. 38
      tools/ironic_tempest_plugin.SHA256SUM

3
doc/source/conf.py

@ -100,8 +100,7 @@ pygments_style = 'sphinx'
# A list of glob-style patterns that should be excluded when looking for
# source files. They are matched against the source file names relative to the
# source directory, using slashes as directory separators on all platforms.
exclude_patterns = ['api/ironic_tempest_plugin.*',
'api/ironic.drivers.modules.ansible.playbooks.*',
exclude_patterns = ['api/ironic.drivers.modules.ansible.playbooks.*',
'api/ironic.tests.*']
# Ignore the following warning: WARNING: while setting up extension

5
ironic_tempest_plugin/PLUGIN-MOVED

@ -0,0 +1,5 @@
The ironic tempest plugin code has been moved to a new dedicated repository:
openstack/ironic-tempest-plugin/
Please update any dependencies to use this new repository for doing ironic tempest tests.

15
ironic_tempest_plugin/README.rst

@ -1,15 +0,0 @@
=====================
Ironic tempest plugin
=====================
This directory contains Tempest tests to cover the Ironic project,
as well as a plugin to automatically load these tests into tempest.
See the tempest plugin documentation for information about creating
a plugin, stable API interface, TempestPlugin class interface, plugin
structure, and how to use plugins:
https://docs.openstack.org/tempest/latest/plugin.html
See the Ironic documentation for information about how to run the
tempest tests:
https://docs.openstack.org/ironic/latest/contributor/dev-quickstart.html#running-tempest-tests

0
ironic_tempest_plugin/__init__.py

53
ironic_tempest_plugin/clients.py

@ -1,53 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import clients
from tempest.common import credentials_factory as common_creds
from tempest import config
from ironic_tempest_plugin.services.baremetal.v1.json.baremetal_client import \
BaremetalClient
CONF = config.CONF
ADMIN_CREDS = None
class Manager(clients.Manager):
def __init__(self,
credentials=None):
"""Initialization of Manager class.
Setup service client and make it available for test cases.
:param credentials: type Credentials or TestResources
"""
if credentials is None:
global ADMIN_CREDS
if ADMIN_CREDS is None:
ADMIN_CREDS = common_creds.get_configured_admin_credentials()
credentials = ADMIN_CREDS
super(Manager, self).__init__(credentials)
default_params_with_timeout_values = {
'build_interval': CONF.compute.build_interval,
'build_timeout': CONF.compute.build_timeout
}
default_params_with_timeout_values.update(self.default_params)
self.baremetal_client = BaremetalClient(
self.auth_provider,
CONF.baremetal.catalog_type,
CONF.identity.region,
endpoint_type=CONF.baremetal.endpoint_type,
**default_params_with_timeout_values)

0
ironic_tempest_plugin/common/__init__.py

33
ironic_tempest_plugin/common/utils.py

@ -1,33 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def get_node(client, node_id=None, instance_uuid=None):
"""Get a node by its identifier or instance UUID.
If both node_id and instance_uuid specified, node_id will be used.
:param client: an instance of tempest plugin BaremetalClient.
:param node_id: identifier (UUID or name) of the node.
:param instance_uuid: UUID of the instance.
:returns: the requested node.
:raises: AssertionError, if neither node_id nor instance_uuid was provided
"""
assert node_id or instance_uuid, ('Either node or instance identifier '
'has to be provided.')
if node_id:
_, body = client.show_node(node_id)
return body
elif instance_uuid:
_, body = client.show_node_by_instance_uuid(instance_uuid)
if body['nodes']:
return body['nodes'][0]

112
ironic_tempest_plugin/common/waiters.py

@ -1,112 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from tempest import config
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
from ironic_tempest_plugin.common import utils
CONF = config.CONF
def _determine_and_check_timeout_interval(timeout, default_timeout,
interval, default_interval):
if timeout is None:
timeout = default_timeout
if interval is None:
interval = default_interval
if (not isinstance(timeout, six.integer_types) or
not isinstance(interval, six.integer_types) or
timeout < 0 or interval < 0):
raise AssertionError(
'timeout and interval should be >= 0 or None, current values are: '
'%(timeout)s, %(interval)s respectively. If timeout and/or '
'interval are None, the default_timeout and default_interval are '
'used, and they should be integers >= 0, current values are: '
'%(default_timeout)s, %(default_interval)s respectively.' % dict(
timeout=timeout, interval=interval,
default_timeout=default_timeout,
default_interval=default_interval)
)
return timeout, interval
def wait_for_bm_node_status(client, node_id, attr, status, timeout=None,
interval=None):
"""Waits for a baremetal node attribute to reach given status.
:param client: an instance of tempest plugin BaremetalClient.
:param node_id: identifier of the node.
:param attr: node's API-visible attribute to check status of.
:param status: desired status. Can be a list of statuses.
:param timeout: the timeout after which the check is considered as failed.
Defaults to client.build_timeout.
:param interval: an interval between show_node calls for status check.
Defaults to client.build_interval.
The client should have a show_node(node_id) method to get the node.
"""
timeout, interval = _determine_and_check_timeout_interval(
timeout, client.build_timeout, interval, client.build_interval)
if not isinstance(status, list):
status = [status]
def is_attr_in_status():
node = utils.get_node(client, node_id=node_id)
if node[attr] in status:
return True
return False
if not test_utils.call_until_true(is_attr_in_status, timeout,
interval):
message = ('Node %(node_id)s failed to reach %(attr)s=%(status)s '
'within the required time (%(timeout)s s).' %
{'node_id': node_id,
'attr': attr,
'status': status,
'timeout': timeout})
caller = test_utils.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
raise lib_exc.TimeoutException(message)
def wait_node_instance_association(client, instance_uuid, timeout=None,
interval=None):
"""Waits for a node to be associated with instance_id.
:param client: an instance of tempest plugin BaremetalClient.
:param instance_uuid: UUID of the instance.
:param timeout: the timeout after which the check is considered as failed.
Defaults to CONF.baremetal.association_timeout.
:param interval: an interval between show_node calls for status check.
Defaults to client.build_interval.
"""
timeout, interval = _determine_and_check_timeout_interval(
timeout, CONF.baremetal.association_timeout,
interval, client.build_interval)
def is_some_node_associated():
node = utils.get_node(client, instance_uuid=instance_uuid)
return node is not None
if not test_utils.call_until_true(is_some_node_associated, timeout,
interval):
msg = ('Timed out waiting to get Ironic node by instance UUID '
'%(instance_uuid)s within the required time (%(timeout)s s).'
% {'instance_uuid': instance_uuid, 'timeout': timeout})
raise lib_exc.TimeoutException(msg)

116
ironic_tempest_plugin/config.py

@ -1,116 +0,0 @@
# Copyright 2015 NEC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from tempest import config # noqa
service_option = cfg.BoolOpt('ironic',
default=False,
help='Whether or not Ironic is expected to be '
'available')
baremetal_group = cfg.OptGroup(name='baremetal',
title='Baremetal provisioning service options',
help='When enabling baremetal tests, Nova '
'must be configured to use the Ironic '
'driver. The following parameters for the '
'[compute] section must be disabled: '
'console_output, interface_attach, '
'live_migration, pause, rescue, resize, '
'shelve, snapshot, and suspend')
baremetal_features_group = cfg.OptGroup(
name='baremetal_feature_enabled',
title="Enabled Baremetal Service Features")
BaremetalGroup = [
cfg.StrOpt('catalog_type',
default='baremetal',
help="Catalog type of the baremetal provisioning service"),
cfg.StrOpt('driver',
default='fake',
help="Driver name which Ironic uses"),
cfg.StrOpt('endpoint_type',
default='publicURL',
choices=['public', 'admin', 'internal',
'publicURL', 'adminURL', 'internalURL'],
help="The endpoint type to use for the baremetal provisioning"
" service"),
cfg.IntOpt('deploywait_timeout',
default=15,
help="Timeout for Ironic node to reach the "
"wait-callback state after powering on."),
cfg.IntOpt('active_timeout',
default=300,
help="Timeout for Ironic node to completely provision"),
cfg.IntOpt('association_timeout',
default=30,
help="Timeout for association of Nova instance and Ironic "
"node"),
cfg.IntOpt('power_timeout',
default=60,
help="Timeout for Ironic power transitions."),
cfg.IntOpt('unprovision_timeout',
default=300,
help="Timeout for unprovisioning an Ironic node. "
"Takes longer since Kilo as Ironic performs an extra "
"step in Node cleaning."),
cfg.StrOpt('min_microversion',
help="Lower version of the test target microversion range. "
"The format is 'X.Y', where 'X' and 'Y' are int values. "
"Tempest selects tests based on the range between "
"min_microversion and max_microversion. "
"If both values are None, Tempest avoids tests which "
"require a microversion."),
cfg.StrOpt('max_microversion',
default='latest',
help="Upper version of the test target microversion range. "
"The format is 'X.Y', where 'X' and 'Y' are int values. "
"Tempest selects tests based on the range between "
"min_microversion and max_microversion. "
"If both values are None, Tempest avoids tests which "
"require a microversion."),
cfg.BoolOpt('use_provision_network',
default=False,
help="Whether the Ironic/Neutron tenant isolation is enabled"),
cfg.StrOpt('whole_disk_image_ref',
help="UUID of the wholedisk image to use in the tests."),
cfg.StrOpt('whole_disk_image_url',
help="An http link to the wholedisk image to use in the "
"tests."),
cfg.StrOpt('whole_disk_image_checksum',
help="An MD5 checksum of the image."),
cfg.StrOpt('partition_image_ref',
help="UUID of the partitioned image to use in the tests."),
cfg.ListOpt('enabled_drivers',
default=['fake', 'pxe_ipmitool', 'agent_ipmitool'],
help="List of Ironic enabled drivers."),
cfg.ListOpt('enabled_hardware_types',
default=['ipmi'],
help="List of Ironic enabled hardware types."),
cfg.IntOpt('adjusted_root_disk_size_gb',
min=0,
help="Ironic adjusted disk size to use in the standalone tests "
"as instance_info/root_gb value."),
]
BaremetalFeaturesGroup = [
cfg.BoolOpt('ipxe_enabled',
default=True,
help="Defines if IPXE is enabled"),
]

559
ironic_tempest_plugin/manager.py

@ -1,559 +0,0 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(soliosg) Do not edit this file. It will only stay temporarily
# in ironic, while QA refactors the tempest.scenario interface. This
# file was copied from openstack/tempest/tempest/scenario/manager.py,
# openstack/tempest commit: 82a278e88c9e9f9ba49f81c1f8dba0bca7943daf
import subprocess
from oslo_log import log
from oslo_utils import netutils
from tempest.common import compute
from tempest.common.utils.linux import remote_client
from tempest.common.utils import net_utils
from tempest.common import waiters
from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
import tempest.test
CONF = config.CONF
LOG = log.getLogger(__name__)
class ScenarioTest(tempest.test.BaseTestCase):
"""Base class for scenario tests. Uses tempest own clients. """
credentials = ['primary']
@classmethod
def setup_clients(cls):
super(ScenarioTest, cls).setup_clients()
# Clients (in alphabetical order)
cls.flavors_client = cls.os_primary.flavors_client
cls.compute_floating_ips_client = (
cls.os_primary.compute_floating_ips_client)
if CONF.service_available.glance:
# Check if glance v1 is available to determine which client to use.
if CONF.image_feature_enabled.api_v1:
cls.image_client = cls.os_primary.image_client
elif CONF.image_feature_enabled.api_v2:
cls.image_client = cls.os_primary.image_client_v2
else:
raise lib_exc.InvalidConfiguration(
'Either api_v1 or api_v2 must be True in '
'[image-feature-enabled].')
# Compute image client
cls.compute_images_client = cls.os_primary.compute_images_client
cls.keypairs_client = cls.os_primary.keypairs_client
# Nova security groups client
cls.compute_security_groups_client = (
cls.os_primary.compute_security_groups_client)
cls.compute_security_group_rules_client = (
cls.os_primary.compute_security_group_rules_client)
cls.servers_client = cls.os_primary.servers_client
cls.interface_client = cls.os_primary.interfaces_client
# Neutron network client
cls.networks_client = cls.os_primary.networks_client
cls.ports_client = cls.os_primary.ports_client
cls.routers_client = cls.os_primary.routers_client
cls.subnets_client = cls.os_primary.subnets_client
cls.floating_ips_client = cls.os_primary.floating_ips_client
cls.security_groups_client = cls.os_primary.security_groups_client
cls.security_group_rules_client = (
cls.os_primary.security_group_rules_client)
if CONF.volume_feature_enabled.api_v2:
cls.volumes_client = cls.os_primary.volumes_v2_client
cls.snapshots_client = cls.os_primary.snapshots_v2_client
else:
cls.volumes_client = cls.os_primary.volumes_client
cls.snapshots_client = cls.os_primary.snapshots_client
# ## Test functions library
#
# The create_[resource] functions only return body and discard the
# resp part which is not used in scenario tests
def _create_port(self, network_id, client=None, namestart='port-quotatest',
**kwargs):
if not client:
client = self.ports_client
name = data_utils.rand_name(namestart)
result = client.create_port(
name=name,
network_id=network_id,
**kwargs)
self.assertIsNotNone(result, 'Unable to allocate port')
port = result['port']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_port, port['id'])
return port
def create_keypair(self, client=None):
if not client:
client = self.keypairs_client
name = data_utils.rand_name(self.__class__.__name__)
# We don't need to create a keypair by pubkey in scenario
body = client.create_keypair(name=name)
self.addCleanup(client.delete_keypair, name)
return body['keypair']
def create_server(self, name=None, image_id=None, flavor=None,
validatable=False, wait_until='ACTIVE',
clients=None, **kwargs):
"""Wrapper utility that returns a test server.
This wrapper utility calls the common create test server and
returns a test server. The purpose of this wrapper is to minimize
the impact on the code of the tests already using this
function.
"""
# NOTE(jlanoux): As a first step, ssh checks in the scenario
# tests need to be run regardless of the run_validation and
# validatable parameters and thus until the ssh validation job
# becomes voting in CI. The test resources management and IP
# association are taken care of in the scenario tests.
# Therefore, the validatable parameter is set to false in all
# those tests. In this way create_server just return a standard
# server and the scenario tests always perform ssh checks.
# Needed for the cross_tenant_traffic test:
if clients is None:
clients = self.os_primary
if name is None:
name = data_utils.rand_name(self.__class__.__name__ + "-server")
vnic_type = CONF.network.port_vnic_type
# If vnic_type is configured create port for
# every network
if vnic_type:
ports = []
create_port_body = {'binding:vnic_type': vnic_type,
'namestart': 'port-smoke'}
if kwargs:
# Convert security group names to security group ids
# to pass to create_port
if 'security_groups' in kwargs:
security_groups = \
clients.security_groups_client.list_security_groups(
).get('security_groups')
sec_dict = dict([(s['name'], s['id'])
for s in security_groups])
sec_groups_names = [s['name'] for s in kwargs.pop(
'security_groups')]
security_groups_ids = [sec_dict[s]
for s in sec_groups_names]
if security_groups_ids:
create_port_body[
'security_groups'] = security_groups_ids
networks = kwargs.pop('networks', [])
else:
networks = []
# If there are no networks passed to us we look up
# for the project's private networks and create a port.
# The same behaviour as we would expect when passing
# the call to the clients with no networks
if not networks:
networks = clients.networks_client.list_networks(
**{'router:external': False, 'fields': 'id'})['networks']
# It's net['uuid'] if networks come from kwargs
# and net['id'] if they come from
# clients.networks_client.list_networks
for net in networks:
net_id = net.get('uuid', net.get('id'))
if 'port' not in net:
port = self._create_port(network_id=net_id,
client=clients.ports_client,
**create_port_body)
ports.append({'port': port['id']})
else:
ports.append({'port': net['port']})
if ports:
kwargs['networks'] = ports
self.ports = ports
tenant_network = self.get_tenant_network()
body, servers = compute.create_test_server(
clients,
tenant_network=tenant_network,
wait_until=wait_until,
name=name, flavor=flavor,
image_id=image_id, **kwargs)
self.addCleanup(waiters.wait_for_server_termination,
clients.servers_client, body['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
clients.servers_client.delete_server, body['id'])
server = clients.servers_client.show_server(body['id'])['server']
return server
def get_remote_client(self, ip_address, username=None, private_key=None):
"""Get a SSH client to a remote server
@param ip_address the server floating or fixed IP address to use
for ssh validation
@param username name of the Linux account on the remote server
@param private_key the SSH private key to use
@return a RemoteClient object
"""
if username is None:
username = CONF.validation.image_ssh_user
# Set this with 'keypair' or others to log in with keypair or
# username/password.
if CONF.validation.auth_method == 'keypair':
password = None
if private_key is None:
private_key = self.keypair['private_key']
else:
password = CONF.validation.image_ssh_password
private_key = None
linux_client = remote_client.RemoteClient(ip_address, username,
pkey=private_key,
password=password)
try:
linux_client.validate_authentication()
except Exception as e:
message = ('Initializing SSH connection to %(ip)s failed. '
'Error: %(error)s' % {'ip': ip_address,
'error': e})
caller = test_utils.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
LOG.exception(message)
self._log_console_output()
raise
return linux_client
def _log_console_output(self, servers=None):
if not CONF.compute_feature_enabled.console_output:
LOG.debug('Console output not supported, cannot log')
return
if not servers:
servers = self.servers_client.list_servers()
servers = servers['servers']
for server in servers:
try:
console_output = self.servers_client.get_console_output(
server['id'])['output']
LOG.debug('Console output for %s\nbody=\n%s',
server['id'], console_output)
except lib_exc.NotFound:
LOG.debug("Server %s disappeared(deleted) while looking "
"for the console log", server['id'])
def rebuild_server(self, server_id, image=None,
preserve_ephemeral=False, wait=True,
rebuild_kwargs=None):
if image is None:
image = CONF.compute.image_ref
rebuild_kwargs = rebuild_kwargs or {}
LOG.debug("Rebuilding server (id: %s, image: %s, preserve eph: %s)",
server_id, image, preserve_ephemeral)
self.servers_client.rebuild_server(
server_id=server_id, image_ref=image,
preserve_ephemeral=preserve_ephemeral,
**rebuild_kwargs)
if wait:
waiters.wait_for_server_status(self.servers_client,
server_id, 'ACTIVE')
def ping_ip_address(self, ip_address, should_succeed=True,
ping_timeout=None, mtu=None):
timeout = ping_timeout or CONF.validation.ping_timeout
cmd = ['ping', '-c1', '-w1']
if mtu:
cmd += [
# don't fragment
'-M', 'do',
# ping receives just the size of ICMP payload
'-s', str(net_utils.get_ping_payload_size(mtu, 4))
]
cmd.append(ip_address)
def ping():
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.communicate()
return (proc.returncode == 0) == should_succeed
caller = test_utils.find_test_caller()
LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the'
' expected result is %(should_succeed)s', {
'caller': caller, 'ip': ip_address, 'timeout': timeout,
'should_succeed':
'reachable' if should_succeed else 'unreachable'
})
result = test_utils.call_until_true(ping, timeout, 1)
LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the '
'ping result is %(result)s', {
'caller': caller, 'ip': ip_address, 'timeout': timeout,
'result': 'expected' if result else 'unexpected'
})
return result
def check_vm_connectivity(self, ip_address,
username=None,
private_key=None,
should_connect=True,
mtu=None):
"""Check server connectivity
:param ip_address: server to test against
:param username: server's ssh username
:param private_key: server's ssh private key to be used
:param should_connect: True/False indicates positive/negative test
positive - attempt ping and ssh
negative - attempt ping and fail if succeed
:param mtu: network MTU to use for connectivity validation
:raises: AssertError if the result of the connectivity check does
not match the value of the should_connect param
"""
if should_connect:
msg = "Timed out waiting for %s to become reachable" % ip_address
else:
msg = "ip address %s is reachable" % ip_address
self.assertTrue(self.ping_ip_address(ip_address,
should_succeed=should_connect,
mtu=mtu),
msg=msg)
if should_connect:
# no need to check ssh for negative connectivity
self.get_remote_client(ip_address, username, private_key)
def create_floating_ip(self, thing, pool_name=None):
"""Create a floating IP and associates to a server on Nova"""
if not pool_name:
pool_name = CONF.network.floating_network_name
floating_ip = (self.compute_floating_ips_client.
create_floating_ip(pool=pool_name)['floating_ip'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.compute_floating_ips_client.delete_floating_ip,
floating_ip['id'])
self.compute_floating_ips_client.associate_floating_ip_to_server(
floating_ip['ip'], thing['id'])
return floating_ip
def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
private_key=None):
ssh_client = self.get_remote_client(ip_address,
private_key=private_key)
if dev_name is not None:
ssh_client.make_fs(dev_name)
ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,
mount_path))
cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % mount_path
ssh_client.exec_command(cmd_timestamp)
timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
% mount_path)
if dev_name is not None:
ssh_client.exec_command('sudo umount %s' % mount_path)
return timestamp
def get_server_ip(self, server):
"""Get the server fixed or floating IP.
Based on the configuration we're in, return a correct ip
address for validating that a guest is up.
"""
if CONF.validation.connect_method == 'floating':
# The tests calling this method don't have a floating IP
# and can't make use of the validation resources. So the
# method is creating the floating IP there.
return self.create_floating_ip(server)['ip']
elif CONF.validation.connect_method == 'fixed':
# Determine the network name to look for based on config or creds
# provider network resources.
if CONF.validation.network_for_ssh:
addresses = server['addresses'][
CONF.validation.network_for_ssh]
else:
creds_provider = self._get_credentials_provider()
net_creds = creds_provider.get_primary_creds()
network = getattr(net_creds, 'network', None)
addresses = (server['addresses'][network['name']]
if network else [])
for address in addresses:
if (address['version'] == CONF.validation.ip_version_for_ssh
and address['OS-EXT-IPS:type'] == 'fixed'):
return address['addr']
raise exceptions.ServerUnreachable(server_id=server['id'])
else:
raise lib_exc.InvalidConfiguration()
def _get_router(self, client=None, tenant_id=None):
"""Retrieve a router for the given tenant id.
If a public router has been configured, it will be returned.
If a public router has not been configured, but a public
network has, a tenant router will be created and returned that
routes traffic to the public network.
"""
if not client:
client = self.routers_client
if not tenant_id:
tenant_id = client.tenant_id
router_id = CONF.network.public_router_id
network_id = CONF.network.public_network_id
if router_id:
body = client.show_router(router_id)
return body['router']
elif network_id:
router = self._create_router(client, tenant_id)
kwargs = {'external_gateway_info': dict(network_id=network_id)}
router = client.update_router(router['id'], **kwargs)['router']
return router
else:
raise Exception("Neither of 'public_router_id' or "
"'public_network_id' has been defined.")
def _create_router(self, client=None, tenant_id=None,
namestart='router-smoke'):
if not client:
client = self.routers_client
if not tenant_id:
tenant_id = client.tenant_id
name = data_utils.rand_name(namestart)
result = client.create_router(name=name,
admin_state_up=True,
tenant_id=tenant_id)
router = result['router']
self.assertEqual(router['name'], name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_router,
router['id'])
return router
class NetworkScenarioTest(ScenarioTest):
"""Base class for network scenario tests.
This class provide helpers for network scenario tests, using the neutron
API. Helpers from ancestor which use the nova network API are overridden
with the neutron API.
This Class also enforces using Neutron instead of novanetwork.
Subclassed tests will be skipped if Neutron is not enabled
"""
credentials = ['primary', 'admin']
@classmethod
def skip_checks(cls):
super(NetworkScenarioTest, cls).skip_checks()
if not CONF.service_available.neutron:
raise cls.skipException('Neutron not available')
def _create_network(self, networks_client=None,
tenant_id=None,
namestart='network-smoke-',
port_security_enabled=True):
if not networks_client:
networks_client = self.networks_client
if not tenant_id:
tenant_id = networks_client.tenant_id
name = data_utils.rand_name(namestart)
network_kwargs = dict(name=name, tenant_id=tenant_id)
# Neutron disables port security by default so we have to check the
# config before trying to create the network with port_security_enabled
if CONF.network_feature_enabled.port_security:
network_kwargs['port_security_enabled'] = port_security_enabled
result = networks_client.create_network(**network_kwargs)
network = result['network']
self.assertEqual(network['name'], name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
networks_client.delete_network,
network['id'])
return network
def _get_server_port_id_and_ip4(self, server, ip_addr=None):
ports = self.os_admin.ports_client.list_ports(
device_id=server['id'], fixed_ip=ip_addr)['ports']
# A port can have more than one IP address in some cases.
# If the network is dual-stack (IPv4 + IPv6), this port is associated
# with 2 subnets
p_status = ['ACTIVE']
# NOTE(vsaienko) With Ironic, instances live on separate hardware
# servers. Neutron does not bind ports for Ironic instances, as a
# result the port remains in the DOWN state.
# TODO(vsaienko) remove once bug: #1599836 is resolved.
if getattr(CONF.service_available, 'ironic', False):
p_status.append('DOWN')
port_map = [(p["id"], fxip["ip_address"])
for p in ports
for fxip in p["fixed_ips"]
if netutils.is_valid_ipv4(fxip["ip_address"])
and p['status'] in p_status]
inactive = [p for p in ports if p['status'] != 'ACTIVE']
if inactive:
LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)
self.assertNotEqual(0, len(port_map),
"No IPv4 addresses found in: %s" % ports)
self.assertEqual(len(port_map), 1,
"Found multiple IPv4 addresses: %s. "
"Unable to determine which port to target."
% port_map)
return port_map[0]
def create_floating_ip(self, thing, external_network_id=None,
port_id=None, client=None):
"""Create a floating IP and associates to a resource/port on Neutron"""
if not external_network_id:
external_network_id = CONF.network.public_network_id
if not client:
client = self.floating_ips_client
if not port_id:
port_id, ip4 = self._get_server_port_id_and_ip4(thing)
else:
ip4 = None
result = client.create_floatingip(
floating_network_id=external_network_id,
port_id=port_id,
tenant_id=thing['tenant_id'],
fixed_ip_address=ip4
)
floating_ip = result['floatingip']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_floatingip,
floating_ip['id'])
return floating_ip

46
ironic_tempest_plugin/plugin.py

@ -1,46 +0,0 @@
# Copyright 2015 NEC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from tempest import config
from tempest.test_discover import plugins
from ironic_tempest_plugin import config as project_config
_opts = [
(project_config.baremetal_group, project_config.BaremetalGroup),
(project_config.baremetal_features_group,
project_config.BaremetalFeaturesGroup)
]
class IronicTempestPlugin(plugins.TempestPlugin):
def load_tests(self):
base_path = os.path.split(os.path.dirname(
os.path.abspath(__file__)))[0]
test_dir = "ironic_tempest_plugin/tests"
full_test_dir = os.path.join(base_path, test_dir)
return full_test_dir, base_path
def register_opts(self, conf):
conf.register_opt(project_config.service_option,
group='service_available')
for group, option in _opts:
config.register_opt_group(conf, group, option)
def get_opt_lists(self):
return [(group.name, option) for group, option in _opts]

0
ironic_tempest_plugin/services/__init__.py

0
ironic_tempest_plugin/services/baremetal/__init__.py

264
ironic_tempest_plugin/services/baremetal/base.py

@ -1,264 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
from oslo_serialization import jsonutils as json
from six.moves import http_client
from six.moves.urllib import parse as urllib
from tempest.lib.common import api_version_utils
from tempest.lib.common import rest_client
# NOTE(vsaienko): concurrent tests work because they are launched in
# separate processes so global variables are not shared among them.
BAREMETAL_MICROVERSION = None
def set_baremetal_api_microversion(baremetal_microversion):
global BAREMETAL_MICROVERSION
BAREMETAL_MICROVERSION = baremetal_microversion
def reset_baremetal_api_microversion():
global BAREMETAL_MICROVERSION
BAREMETAL_MICROVERSION = None
def handle_errors(f):
"""A decorator that allows to ignore certain types of errors."""
@functools.wraps(f)
def wrapper(*args, **kwargs):
param_name = 'ignore_errors'
ignored_errors = kwargs.get(param_name, tuple())
if param_name in kwargs:
del kwargs[param_name]
try:
return f(*args, **kwargs)
except ignored_errors:
# Silently ignore errors
pass
return wrapper
class BaremetalClient(rest_client.RestClient):
"""Base Tempest REST client for Ironic API."""
api_microversion_header_name = 'X-OpenStack-Ironic-API-Version'
uri_prefix = ''
def get_headers(self):
headers = super(BaremetalClient, self).get_headers()
if BAREMETAL_MICROVERSION:
headers[self.api_microversion_header_name] = BAREMETAL_MICROVERSION
return headers
def request(self, *args, **kwargs):
resp, resp_body = super(BaremetalClient, self).request(*args, **kwargs)
if (BAREMETAL_MICROVERSION and
BAREMETAL_MICROVERSION != api_version_utils.LATEST_MICROVERSION):
api_version_utils.assert_version_header_matches_request(
self.api_microversion_header_name,
BAREMETAL_MICROVERSION,
resp)
return resp, resp_body
def serialize(self, object_dict):
"""Serialize an Ironic object."""
return json.dumps(object_dict)
def deserialize(self, object_str):
"""Deserialize an Ironic object."""
return json.loads(object_str)
def _get_uri(self, resource_name, uuid=None, permanent=False):
"""Get URI for a specific resource or object.
:param resource_name: The name of the REST resource, e.g., 'nodes'.
:param uuid: The unique identifier of an object in UUID format.
:returns: Relative URI for the resource or object.
"""
prefix = self.uri_prefix if not permanent else ''
return '{pref}/{res}{uuid}'.format(pref=prefix,
res=resource_name,
uuid='/%s' % uuid if uuid else '')
def _make_patch(self, allowed_attributes, **kwargs):
"""Create a JSON patch according to RFC 6902.
:param allowed_attributes: An iterable object that contains a set of
allowed attributes for an object.
:param **kwargs: Attributes and new values for them.
:returns: A JSON path that sets values of the specified attributes to
the new ones.
"""
def get_change(kwargs, path='/'):
for name, value in kwargs.items():
if isinstance(value, dict):
for ch in get_change(value, path + '%s/' % name):
yield ch
else:
if value is None:
yield {'path': path + name,
'op': 'remove'}
else:
yield {'path': path + name,
'value': value,
'op': 'replace'}
patch = [ch for ch in get_change(kwargs)
if ch['path'].lstrip('/') in allowed_attributes]
return patch
def _list_request(self, resource, permanent=False, headers=None,
extra_headers=False, **kwargs):
"""Get the list of objects of the specified type.
:param resource: The name of the REST resource, e.g., 'nodes'.
:param headers: List of headers to use in request.
:param extra_headers: Specify whether to use headers.
:param **kwargs: Parameters for the request.
:returns: A tuple with the server response and deserialized JSON list
of objects
"""
uri = self._get_uri(resource, permanent=permanent)
if kwargs:
uri += "?%s" % urllib.urlencode(kwargs)
resp, body = self.get(uri, headers=headers,
extra_headers=extra_headers)
self.expected_success(http_client.OK, resp.status)
return resp, self.deserialize(body)
def _show_request(self,
resource,
uuid=None,
permanent=False,
**kwargs):
"""Gets a specific object of the specified type.
:param uuid: Unique identifier of the object in UUID format.
:returns: Serialized object as a dictionary.
"""
if 'uri' in kwargs:
uri = kwargs['uri']
else:
uri = self._get_uri(resource, uuid=uuid, permanent=permanent)
resp, body = self.get(uri)
self.expected_success(http_client.OK, resp.status)
return resp, self.deserialize(body)
def _create_request(self, resource, object_dict):
"""Create an object of the specified type.
:param resource: The name of the REST resource, e.g., 'nodes'.
:param object_dict: A Python dict that represents an object of the
specified type.
:returns: A tuple with the server response and the deserialized created
object.
"""
body = self.serialize(object_dict)
uri = self._get_uri(resource)
resp, body = self.post(uri, body=body)
self.expected_success(http_client.CREATED, resp.status)
return resp, self.deserialize(body)
def _create_request_no_response_body(self, resource, object_dict):
"""Create an object of the specified type.
Do not expect any body in the response.
:param resource: The name of the REST resource, e.g., 'nodes'.
:param object_dict: A Python dict that represents an object of the
specified type.
:returns: The server response.
"""
body = self.serialize(object_dict)
uri = self._get_uri(resource)
resp, body = self.post(uri, body=body)
self.expected_success(http_client.NO_CONTENT, resp.status)
return resp
def _delete_request(self, resource, uuid):
"""Delete specified object.
:param resource: The name of the REST resource, e.g., 'nodes'.
:param uuid: The unique identifier of an object in UUID format.
:returns: A tuple with the server response and the response body.
"""
uri = self._get_uri(resource, uuid)
resp, body = self.delete(uri)
self.expected_success(http_client.NO_CONTENT, resp.status)
return resp, body
def _patch_request(self, resource, uuid, patch_object):
"""Update specified object with JSON-patch.
:param resource: The name of the REST resource, e.g., 'nodes'.
:param uuid: The unique identifier of an object in UUID format.
:returns: A tuple with the server response and the serialized patched
object.
"""
uri = self._get_uri(resource, uuid)
patch_body = json.dumps(patch_object)
resp, body = self.patch(uri, body=patch_body)
self.expected_success(http_client.OK, resp.status)
return resp, self.deserialize(body)
@handle_errors
def get_api_description(self):
"""Retrieves all versions of the Ironic API."""
return self._list_request('', permanent=True)
@handle_errors
def get_version_description(self, version='v1'):
"""Retrieves the description of the API.
:param version: The version of the API. Default: 'v1'.
:returns: Serialized description of API resources.
"""
return self._list_request(version, permanent=True)
def _put_request(self, resource, put_object):
"""Update specified object with JSON-patch."""
uri = self._get_uri(resource)
put_body = json.dumps(put_object)
resp, body = self.put(uri, body=put_body)
self.expected_success([http_client.ACCEPTED, http_client.NO_CONTENT],
resp.status)
return resp, body

0
ironic_tempest_plugin/services/baremetal/v1/__init__.py

0
ironic_tempest_plugin/services/baremetal/v1/json/__init__.py

641
ironic_tempest_plugin/services/baremetal/v1/json/baremetal_client.py

@ -1,641 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import http_client
from ironic_tempest_plugin.services.baremetal import base
class BaremetalClient(base.BaremetalClient):
"""Base Tempest REST client for Ironic API v1."""
version = '1'
uri_prefix = 'v1'
@base.handle_errors
def list_nodes(self, **kwargs):
"""List all existing nodes."""
return self._list_request('nodes', **kwargs)
@base.handle_errors
def list_nodes_detail(self, **kwargs):
"""Detailed list of all existing nodes."""
return self._list_request('/nodes/detail', **kwargs)
@base.handle_errors
def list_chassis(self):
"""List all existing chassis."""
return self._list_request('chassis')
@base.handle_errors
def list_chassis_nodes(self, chassis_uuid):
"""List all nodes associated with a chassis."""
return self._list_request('/chassis/%s/nodes' % chassis_uuid)
@base.handle_errors
def list_ports(self, **kwargs):
"""List all existing ports."""
return self._list_request('ports', **kwargs)
@base.handle_errors
def list_portgroups(self, **kwargs):
"""List all existing port groups."""
return self._list_request('portgroups', **kwargs)
@base.handle_errors
def list_volume_connectors(self, **kwargs):
"""List all existing volume connectors."""
return self._list_request('volume/connectors', **kwargs)
@base.handle_errors
def list_volume_targets(self, **kwargs):
"""List all existing volume targets."""
return self._list_request('volume/targets', **kwargs)
@base.handle_errors
def list_node_ports(self, uuid):
"""List all ports associated with the node."""
return self._list_request('/nodes/%s/ports' % uuid)
@base.handle_errors
def list_nodestates(self, uuid):
"""List all existing states."""
return self._list_request('/nodes/%s/states' % uuid)
@base.handle_errors
def list_ports_detail(self, **kwargs):
"""Details list all existing ports."""
return self._list_request('/ports/detail', **kwargs)
@base.handle_errors
def list_drivers(self):
"""List all existing drivers."""
return self._list_request('drivers')
@base.handle_errors
def show_node(self, uuid):
"""Gets a specific node.
:param uuid: Unique identifier of the node in UUID format.
:return: Serialized node as a dictionary.
"""
return self._show_request('nodes', uuid)
@base.handle_errors
def show_node_by_instance_uuid(self, instance_uuid):
"""Gets a node associated with given instance uuid.
:param instance_uuid: Unique identifier of the instance in UUID format.
:return: Serialized node as a dictionary.
"""
uri = '/nodes/detail?instance_uuid=%s' % instance_uuid
return self._show_request('nodes',
uuid=None,
uri=uri)
@base.handle_errors
def show_chassis(self, uuid):
"""Gets a specific chassis.
:param uuid: Unique identifier of the chassis in UUID format.
:return: Serialized chassis as a dictionary.
"""
return self._show_request('chassis', uuid)
@base.handle_errors
def show_port(self, uuid):
"""Gets a specific port.
:param uuid: Unique identifier of the port in UUID format.
:return: Serialized port as a dictionary.
"""
return self._show_request('ports', uuid)
@base.handle_errors
def show_portgroup(self, portgroup_ident):
"""Gets a specific port group.
:param portgroup_ident: Name or UUID of the port group.
:return: Serialized port group as a dictionary.
"""
return self._show_request('portgroups', portgroup_ident)
@base.handle_errors
def show_volume_connector(self, volume_connector_ident):
"""Gets a specific volume connector.
:param volume_connector_ident: UUID of the volume connector.
:return: Serialized volume connector as a dictionary.
"""
return self._show_request('volume/connectors', volume_connector_ident)
@base.handle_errors
def show_volume_target(self, volume_target_ident):
"""Gets a specific volume target.
:param volume_target_ident: UUID of the volume target.
:return: Serialized volume target as a dictionary.
"""
return self._show_request('volume/targets', volume_target_ident)
@base.handle_errors
def show_port_by_address(self, address):
"""Gets a specific port by address.
:param address: MAC address of the port.
:return: Serialized port as a dictionary.
"""
uri = '/ports/detail?address=%s' % address
return self._show_request('ports', uuid=None, uri=uri)
def show_driver(self, driver_name):
"""Gets a specific driver.
:param driver_name: Name of driver.
:return: Serialized driver as a dictionary.
"""
return self._show_request('drivers', driver_name)
@base.handle_errors
def create_node(self, chassis_id=None, **kwargs):
"""Create a baremetal node with the specified parameters.
:param chassis_id: The unique identifier of the chassis.
:param cpu_arch: CPU architecture of the node. Default: x86_64.
:param cpus: Number of CPUs. Default: 8.
:param local_gb: Disk size. Default: 1024.
:param memory_mb: Available RAM. Default: 4096.
:param driver: Driver name. Default: "fake"
:return: A tuple with the server response and the created node.
"""
node = {}
if kwargs.get('resource_class'):
node['resource_class'] = kwargs['resource_class']
node.update(
{'chassis_uuid': chassis_id,
'properties': {'cpu_arch': kwargs.get('cpu_arch', 'x86_64'),
'cpus': kwargs.get('cpus', 8),
'local_gb': kwargs.get('local_gb', 1024),
'memory_mb': kwargs.get('memory_mb', 4096)},
'driver': kwargs.get('driver', 'fake')}
)
return self._create_request('nodes', node)
@base.handle_errors
def create_chassis(self, **kwargs):
"""Create a chassis with the specified parameters.
:param description: The description of the chassis.
Default: test-chassis
:return: A tuple with the server response and the created chassis.
"""
chassis = {'description': kwargs.get('description', 'test-chassis')}
if 'uuid' in kwargs:
chassis.update({'uuid': kwargs.get('uuid')})
return self._create_request('chassis', chassis)
@base.handle_errors
def create_port(self, node_id, **kwargs):
"""Create a port with the specified parameters.
:param node_id: The ID of the node which owns the port.
:param address: MAC address of the port.
:param extra: Meta data of the port. Default: {'foo': 'bar'}.
:param uuid: UUID of the port.
:param portgroup_uuid: The UUID of a portgroup of which this port is a
member.
:param physical_network: The physical network to which the port is
attached.
:return: A tuple with the server response and the created port.
"""
port = {'extra': kwargs.get('extra', {'foo': 'bar'}),
'uuid': kwargs['uuid']}
if node_id is not None:
port['node_uuid'] = node_id
for key in ('address', 'physical_network', 'portgroup_uuid'):
if kwargs.get(key) is not None:
port[key] = kwargs[key]
return self._create_request('ports', port)
@base.handle_errors
def create_portgroup(self, node_uuid, **kwargs):
"""Create a port group with the specified parameters.
:param node_uuid: The UUID of the node which owns the port group.
:param kwargs:
address: MAC address of the port group. Optional.
extra: Meta data of the port group. Default: {'foo': 'bar'}.
name: Name of the port group. Optional.
uuid: UUID of the port group. Optional.
:return: A tuple with the server response and the created port group.
"""
portgroup = {'extra': kwargs.get('extra', {'foo': 'bar'})}