Merge "trivial: Remove some single use function from utils"

This commit is contained in:
Jenkins
2017-09-06 16:08:16 +00:00
committed by Gerrit Code Review
6 changed files with 122 additions and 100 deletions

View File

@@ -20,6 +20,7 @@
from __future__ import print_function
import argparse
import inspect
import traceback
from oslo_log import log as logging
@@ -29,7 +30,6 @@ import nova.conf
import nova.db.api
from nova import exception
from nova.i18n import _
from nova import utils
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
@@ -52,6 +52,35 @@ def block_db_access(service_name):
nova.db.api.IMPL = NoDB()
def validate_args(fn, *args, **kwargs):
"""Check that the supplied args are sufficient for calling a function.
>>> validate_args(lambda a: None)
Traceback (most recent call last):
...
MissingArgs: Missing argument(s): a
>>> validate_args(lambda a, b, c, d: None, 0, c=1)
Traceback (most recent call last):
...
MissingArgs: Missing argument(s): b, d
:param fn: the function to check
:param arg: the positional arguments supplied
:param kwargs: the keyword arguments supplied
"""
argspec = inspect.getargspec(fn)
num_defaults = len(argspec.defaults or [])
required_args = argspec.args[:len(argspec.args) - num_defaults]
if six.get_method_self(fn) is not None:
required_args.pop(0)
missing = [arg for arg in required_args if arg not in kwargs]
missing = missing[len(args):]
return missing
# Decorators for actions
def args(*args, **kwargs):
"""Decorator which adds the given args and kwargs to the args list of
@@ -155,7 +184,7 @@ def get_action_fn():
# call the action with the remaining arguments
# check arguments
missing = utils.validate_args(fn, *fn_args, **fn_kwargs)
missing = validate_args(fn, *fn_args, **fn_kwargs)
if missing:
# NOTE(mikal): this isn't the most helpful error message ever. It is
# long, and tells you a lot of things you probably don't want to know

View File

@@ -67,10 +67,11 @@ from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_utils import encodeutils
from oslo_utils import importutils
from oslo_utils import uuidutils
import prettytable
import six
import six.moves.urllib.parse as urlparse
from sqlalchemy.engine import url as sqla_url
@@ -674,6 +675,37 @@ class DbCommands(object):
def __init__(self):
pass
@staticmethod
def print_dict(dct, dict_property="Property", dict_value='Value'):
"""Print a `dict` as a table of two columns.
:param dct: `dict` to print
:param dict_property: name of the first column
:param wrap: wrapping for the second column
:param dict_value: header label for the value (second) column
"""
pt = prettytable.PrettyTable([dict_property, dict_value])
pt.align = 'l'
for k, v in sorted(dct.items()):
# convert dict to str to check length
if isinstance(v, dict):
v = six.text_type(v)
# if value has a newline, add in multiple rows
# e.g. fault with stacktrace
if v and isinstance(v, six.string_types) and r'\n' in v:
lines = v.strip().split(r'\n')
col1 = k
for line in lines:
pt.add_row([col1, line])
col1 = ''
else:
pt.add_row([k, v])
if six.PY2:
print(encodeutils.safe_encode(pt.get_string()))
else:
print(encodeutils.safe_encode(pt.get_string()).decode())
@args('--version', metavar='<version>', help=argparse.SUPPRESS)
@args('--local_cell', action='store_true',
help='Only sync db in the local cell: do not attempt to fan-out'
@@ -758,8 +790,8 @@ class DbCommands(object):
sys.stdout.write('.')
if verbose:
if table_to_rows_archived:
utils.print_dict(table_to_rows_archived, _('Table'),
dict_value=_('Number of Rows Archived'))
self.print_dict(table_to_rows_archived, _('Table'),
dict_value=_('Number of Rows Archived'))
else:
print(_('Nothing was archived.'))
# NOTE(danms): Return nonzero if we archived something
@@ -1039,6 +1071,36 @@ class CellCommands(object):
'been deprecated. They will be removed in an upcoming '
'release.')
@staticmethod
def parse_server_string(server_str):
"""Parses the given server_string and returns a tuple of host and port.
If it's not a combination of host part and port, the port element is an
empty string. If the input is invalid expression, return a tuple of two
empty strings.
"""
try:
# First of all, exclude pure IPv6 address (w/o port).
if netaddr.valid_ipv6(server_str):
return (server_str, '')
# Next, check if this is IPv6 address with a port number
# combination.
if server_str.find("]:") != -1:
(address, port) = server_str.replace('[', '', 1).split(']:')
return (address, port)
# Third, check if this is a combination of an address and a port
if server_str.find(':') == -1:
return (server_str, '')
# This must be a combination of an address and a port
(address, port) = server_str.split(':')
return (address, port)
except (ValueError, netaddr.AddrFormatError):
print('Invalid server_string: %s' % server_str)
return ('', '')
def _create_transport_hosts(self, username, password,
broker_hosts=None, hostname=None, port=None):
"""Returns a list of oslo.messaging.TransportHost objects."""
@@ -1048,7 +1110,7 @@ class CellCommands(object):
hosts = broker_hosts.split(',')
for host in hosts:
host = host.strip()
broker_hostname, broker_port = utils.parse_server_string(host)
broker_hostname, broker_port = self.parse_server_string(host)
if not broker_port:
msg = _('Invalid broker_hosts value: %s. It should be'
' in hostname:port format') % host

View File

@@ -65,6 +65,24 @@ LOG = logging.getLogger(__name__)
CONF = nova.conf.CONF
def get_my_linklocal(interface):
try:
if_str = utils.execute(
'ip', '-f', 'inet6', '-o', 'addr', 'show', interface)
condition = '\s+inet6\s+([0-9a-f:]+)/\d+\s+scope\s+link'
links = [re.search(condition, x) for x in if_str[0].split('\n')]
address = [w.group(1) for w in links if w is not None]
if address[0] is not None:
return address[0]
else:
msg = _('Link Local address is not found.:%s') % if_str
raise exception.NovaException(msg)
except Exception as ex:
msg = _("Couldn't get Link Local IP of %(interface)s"
" :%(ex)s") % {'interface': interface, 'ex': ex}
raise exception.NovaException(msg)
class RPCAllocateFixedIP(object):
"""Mixin class originally for FlatDCHP and VLAN network managers.
@@ -1816,7 +1834,7 @@ class FlatDHCPManager(RPCAllocateFixedIP, floating_ips.FloatingIP,
self.driver.update_dhcp(elevated, dev, network)
if CONF.use_ipv6:
self.driver.update_ra(context, dev, network)
gateway = utils.get_my_linklocal(dev)
gateway = get_my_linklocal(dev)
network.gateway_v6 = gateway
network.save()
@@ -2083,7 +2101,7 @@ class VlanManager(RPCAllocateFixedIP, floating_ips.FloatingIP, NetworkManager):
self.driver.update_dhcp(elevated, dev, network)
if CONF.use_ipv6:
self.driver.update_ra(context, dev, network)
gateway = utils.get_my_linklocal(dev)
gateway = get_my_linklocal(dev)
network.gateway_v6 = gateway
network.save()

View File

@@ -15,6 +15,7 @@
import sys
import netaddr
from neutronclient.common import exceptions as n_exc
from neutronclient.neutron import v2_0 as neutronv20
from oslo_log import log as logging
@@ -273,7 +274,8 @@ class SecurityGroupAPI(security_group_base.SecurityGroupBase):
if not rule.get('cidr'):
new_rule['ethertype'] = 'IPv4'
else:
new_rule['ethertype'] = utils.get_ip_version(rule.get('cidr'))
version = netaddr.IPNetwork(rule.get('cidr')).version
new_rule['ethertype'] = 'IPv6' if version == 6 else 'IPv4'
new_rule['remote_ip_prefix'] = rule.get('cidr')
new_rule['security_group_id'] = rule.get('parent_group_id')
new_rule['remote_group_id'] = rule.get('group_id')

View File

@@ -105,7 +105,7 @@ class TestCmdCommon(test.NoDBTestCase):
mock_method_of.assert_called_once_with(mock_fn.return_value)
mock_print.assert_called_once_with(' '.join([k for k, v in actions]))
@mock.patch.object(cmd_common.utils, 'validate_args')
@mock.patch.object(cmd_common, 'validate_args')
@mock.patch.object(cmd_common, 'CONF')
def test_get_action_fn(self, mock_CONF, mock_validate_args):
mock_validate_args.return_value = None
@@ -127,7 +127,7 @@ class TestCmdCommon(test.NoDBTestCase):
self.assertTrue(actual_kwargs['bar'])
self.assertNotIn('missing', actual_kwargs)
@mock.patch.object(cmd_common.utils, 'validate_args')
@mock.patch.object(cmd_common, 'validate_args')
@mock.patch.object(cmd_common, 'CONF')
def test_get_action_fn_missing_args(self, mock_CONF, mock_validate_args):
# Don't leak the actual print call

View File

@@ -45,7 +45,6 @@ from oslo_utils import importutils
from oslo_utils import strutils
from oslo_utils import timeutils
from oslo_utils import units
import prettytable
import six
from six.moves import range
@@ -385,23 +384,6 @@ def generate_password(length=None, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):
return ''.join(password)
def get_my_linklocal(interface):
try:
if_str = execute('ip', '-f', 'inet6', '-o', 'addr', 'show', interface)
condition = '\s+inet6\s+([0-9a-f:]+)/\d+\s+scope\s+link'
links = [re.search(condition, x) for x in if_str[0].split('\n')]
address = [w.group(1) for w in links if w is not None]
if address[0] is not None:
return address[0]
else:
msg = _('Link Local address is not found.:%s') % if_str
raise exception.NovaException(msg)
except Exception as ex:
msg = _("Couldn't get Link Local IP of %(interface)s"
" :%(ex)s") % {'interface': interface, 'ex': ex}
raise exception.NovaException(msg)
# TODO(sfinucan): Replace this with the equivalent from oslo.utils
def utf8(value):
"""Try to turn a string into utf-8 if possible.
@@ -458,17 +440,6 @@ def get_shortened_ipv6_cidr(address):
return str(net.cidr)
def get_ip_version(network):
"""Returns the IP version of a network (IPv4 or IPv6).
Raises AddrFormatError if invalid network.
"""
if netaddr.IPNetwork(network).version == 6:
return "IPv6"
elif netaddr.IPNetwork(network).version == 4:
return "IPv4"
def safe_ip_format(ip):
"""Transform ip string to "safe" format.
@@ -1296,63 +1267,3 @@ def isotime(at=None):
def strtime(at):
return at.strftime("%Y-%m-%dT%H:%M:%S.%f")
def print_dict(dct, dict_property="Property", dict_value='Value'):
"""Print a `dict` as a table of two columns.
:param dct: `dict` to print
:param dict_property: name of the first column
:param wrap: wrapping for the second column
:param dict_value: header label for the value (second) column
"""
pt = prettytable.PrettyTable([dict_property, dict_value])
pt.align = 'l'
for k, v in sorted(dct.items()):
# convert dict to str to check length
if isinstance(v, dict):
v = six.text_type(v)
# if value has a newline, add in multiple rows
# e.g. fault with stacktrace
if v and isinstance(v, six.string_types) and r'\n' in v:
lines = v.strip().split(r'\n')
col1 = k
for line in lines:
pt.add_row([col1, line])
col1 = ''
else:
pt.add_row([k, v])
if six.PY2:
print(encodeutils.safe_encode(pt.get_string()))
else:
print(encodeutils.safe_encode(pt.get_string()).decode())
def validate_args(fn, *args, **kwargs):
"""Check that the supplied args are sufficient for calling a function.
>>> validate_args(lambda a: None)
Traceback (most recent call last):
...
MissingArgs: Missing argument(s): a
>>> validate_args(lambda a, b, c, d: None, 0, c=1)
Traceback (most recent call last):
...
MissingArgs: Missing argument(s): b, d
:param fn: the function to check
:param arg: the positional arguments supplied
:param kwargs: the keyword arguments supplied
"""
argspec = inspect.getargspec(fn)
num_defaults = len(argspec.defaults or [])
required_args = argspec.args[:len(argspec.args) - num_defaults]
if six.get_method_self(fn) is not None:
required_args.pop(0)
missing = [arg for arg in required_args if arg not in kwargs]
missing = missing[len(args):]
return missing