Fix ruff check violations

Convert remaining percent formatting and .format() calls to f-strings
and fix line length violations to comply with 79 character limit.

this also updates the check-merge-conflict pre-commit rule to exclude
.rst and .inc files where ===== is valid syntax and not a merge conflict
marker.

Changes include:
- Convert % formatting to f-strings in nova_helper.py and collection.py
- Split long f-strings across multiple lines where needed
- Use intermediate variables to reduce line length
- Replace problematic f-string patterns to avoid false positive
  N341 violations in gnocchi.py

Generated-By: Claude <noreply@anthropic.com>
Change-Id: I2cd6c6b5ff7ffb1e800b3f55fe7af42bd878c6b9
Signed-off-by: Sean Mooney <work@seanmooney.info>
This commit is contained in:
Sean Mooney
2025-09-17 08:50:59 +01:00
parent b22da92344
commit 9b5c8c1c4f
67 changed files with 379 additions and 396 deletions

View File

@@ -23,6 +23,7 @@ repos:
- id: check-case-conflict
- id: detect-private-key
- id: check-merge-conflict
exclude: '.*\.(rst|inc)$'
- repo: https://github.com/Lucas-C/pre-commit-hooks
rev: v1.5.5
hooks:

View File

@@ -53,7 +53,7 @@ class BaseWatcherDirective(rst.Directive):
if not obj_raw_docstring:
# Raise a warning to make the tests fail with doc8
raise self.error("No docstring available for %s!" % obj)
raise self.error(f"No docstring available for {obj}!")
obj_docstring = inspect.cleandoc(obj_raw_docstring)
self.add_textblock(obj_docstring)
@@ -130,7 +130,7 @@ class WatcherFunc(BaseWatcherDirective):
def run(self):
if not self.content:
error = self.state_machine.reporter.error(
'The "%s" directive is empty; content required.' % self.name,
f'The "{self.name}" directive is empty; content required.',
nodes.literal_block(self.block_text, self.block_text),
line=self.lineno)
return [error]

View File

@@ -117,10 +117,9 @@ jQuery(document).ready(function(){
event_type = sample_file[0: -5]
html_str = self.TOGGLE_SCRIPT % ((event_type, ) * 3)
html_str += ("<input type='button' id='%s-hideshow' "
"value='hide/show sample'>" % event_type)
html_str += ("<div id='%s-div'><pre>%s</pre></div>"
% (event_type, sample_content))
html_str += (f"<input type='button' id='{event_type}-hideshow' "
"value='hide/show sample'>")
html_str += (f"<div id='{event_type}-div'><pre>{sample_content}</pre></div>")
raw = nodes.raw('', html_str, format="html")
col.append(raw)

View File

@@ -112,7 +112,7 @@ html_theme = 'openstackdocs'
# html_theme_options = {}
# Output file base name for HTML help builder.
htmlhelp_basename = '%sdoc' % project
htmlhelp_basename = f'{project}doc'
# openstackdocstheme options

View File

@@ -9,8 +9,6 @@ target-version = "py310"
[tool.ruff.lint]
select = ["E4", "E7", "E9", "F", "S", "U", "W", "C90"]
ignore = [
"UP031", # Use format specifiers instead of percent format
"UP032", # Use f-string instead of `format` call
# we only use asserts for type narrowing
"S101",
# we do not use random number geneerators for crypto

View File

@@ -79,7 +79,7 @@ class Version:
headers, default_version, latest_version)
def __repr__(self):
return '{}.{}'.format(self.major, self.minor)
return f'{self.major}.{self.minor}'
@staticmethod
def parse_headers(headers, default_version, latest_version):
@@ -121,7 +121,7 @@ class Version:
if len(version) != 2:
raise exc.HTTPNotAcceptable(
"Invalid value for %s header" % Version.string)
f"Invalid value for {Version.string} header")
return version
def __gt__(self, other):

View File

@@ -52,7 +52,7 @@ class Version(base.APIBase):
@staticmethod
def convert(id, status=APIStatus.CURRENT):
v = importlib.import_module('watcher.api.controllers.%s.versions' % id)
v = importlib.import_module(f'watcher.api.controllers.{id}.versions')
version = Version()
version.id = id
version.status = status

View File

@@ -242,21 +242,22 @@ class Controller(rest.RestController):
headers = {}
# ensure that major version in the URL matches the header
if version.major != versions.BASE_VERSION:
min_ver = versions.min_version_string()
max_ver = versions.max_version_string()
raise exc.HTTPNotAcceptable(
"Mutually exclusive versions requested. Version %(ver)s "
f"Mutually exclusive versions requested. Version {version} "
"requested but not supported by this service. The supported "
"version range is: [%(min)s, %(max)s]." %
{'ver': version, 'min': versions.min_version_string(),
'max': versions.max_version_string()},
f"version range is: [{min_ver}, {max_ver}].",
headers=headers)
# ensure the minor version is within the supported range
if version < min_version() or version > max_version():
min_ver = versions.min_version_string()
max_ver = versions.max_version_string()
raise exc.HTTPNotAcceptable(
"Version %(ver)s was requested but the minor version is not "
f"Version {version} was requested but the minor version is "
"not "
"supported by this service. The supported version range is: "
"[%(min)s, %(max)s]." %
{'ver': version, 'min': versions.min_version_string(),
'max': versions.max_version_string()},
f"[{min_ver}, {max_ver}].",
headers=headers)
@pecan.expose()

View File

@@ -177,17 +177,15 @@ class AuditPostType(wtypes.Base):
if self.strategy:
strategy = _get_object_by_value(context, objects.Strategy,
self.strategy)
self.name = "{}-{}".format(strategy.name,
timeutils.utcnow().isoformat())
self.name = f"{strategy.name}-{timeutils.utcnow().isoformat()}"
elif self.audit_template_uuid:
audit_template = objects.AuditTemplate.get(
context, self.audit_template_uuid)
self.name = "{}-{}".format(audit_template.name,
timeutils.utcnow().isoformat())
timestamp = timeutils.utcnow().isoformat()
self.name = f"{audit_template.name}-{timestamp}"
else:
goal = _get_object_by_value(context, objects.Goal, self.goal)
self.name = "{}-{}".format(goal.name,
timeutils.utcnow().isoformat())
self.name = f"{goal.name}-{timeutils.utcnow().isoformat()}"
# No more than 63 characters
if len(self.name) > 63:
LOG.warning("Audit: %s length exceeds 63 characters",

View File

@@ -177,7 +177,7 @@ class AuditTemplatePostType(wtypes.Base):
if strategy.goal_id != goal.id:
available_strategies = objects.Strategy.list(
AuditTemplatePostType._ctx)
choices = ["'{}' ({})".format(s.uuid, s.name)
choices = [f"'{s.uuid}' ({s.name})"
for s in available_strategies]
raise exception.InvalidStrategy(
message=_(

View File

@@ -40,10 +40,9 @@ class Collection(base.APIBase):
return wtypes.Unset
resource_url = url or self._type
q_args = ''.join(['{}={}&'.format(key, kwargs[key]) for key in kwargs])
next_args = '?%(args)slimit=%(limit)d&marker=%(marker)s' % {
'args': q_args, 'limit': limit,
'marker': getattr(self.collection[-1], marker_field)}
q_args = ''.join([f'{key}={kwargs[key]}&' for key in kwargs])
marker = getattr(self.collection[-1], marker_field)
next_args = f'?{q_args}limit={limit:d}&marker={marker}'
return link.Link.make_link('next', pecan.request.host_url,
resource_url, next_args).href

View File

@@ -98,13 +98,13 @@ def apply_jsonpatch(doc, patch):
def get_patch_value(patch, key):
for p in patch:
if p['op'] == 'replace' and p['path'] == '/%s' % key:
if p['op'] == 'replace' and p['path'] == f'/{key}':
return p['value']
def set_patch_value(patch, key, value):
for p in patch:
if p['op'] == 'replace' and p['path'] == '/%s' % key:
if p['op'] == 'replace' and p['path'] == f'/{key}':
p['value'] = value

View File

@@ -31,10 +31,8 @@ class VERSIONS(enum.Enum):
# This is the version 1 API
BASE_VERSION = 1
# String representations of the minor and maximum versions
_MIN_VERSION_STRING = '{}.{}'.format(BASE_VERSION,
VERSIONS.MINOR_0_ROCKY.value)
_MAX_VERSION_STRING = '{}.{}'.format(BASE_VERSION,
VERSIONS.MINOR_MAX_VERSION.value)
_MIN_VERSION_STRING = f'{BASE_VERSION}.{VERSIONS.MINOR_0_ROCKY.value}'
_MAX_VERSION_STRING = f'{BASE_VERSION}.{VERSIONS.MINOR_MAX_VERSION.value}'
def service_type_string():

View File

@@ -78,8 +78,8 @@ class ParsableErrorMiddleware:
'error_message', text='\n'.join(app_iter)))]
except et.ElementTree.ParseError as err:
LOG.error('Error parsing HTTP response: %s', err)
body = ['<error_message>%s'
'</error_message>' % state['status_code']]
body = ['<error_message>{}'
'</error_message>'.format(state['status_code'])]
state['headers'].append(('Content-Type', 'application/xml'))
else:
app_iter = [i.decode('utf-8') for i in app_iter]

View File

@@ -129,8 +129,8 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
class TaskFlowActionContainer(base.BaseTaskFlowActionContainer):
def __init__(self, db_action, engine):
self.name = "action_type:{} uuid:{}".format(db_action.action_type,
db_action.uuid)
self.name = (f"action_type:{db_action.action_type} "
f"uuid:{db_action.uuid}")
super().__init__(self.name,
db_action,
engine)

View File

@@ -157,9 +157,9 @@ class CinderHelper:
time.sleep(retry_interval)
if getattr(volume, 'migration_status') == 'error':
host_name = getattr(volume, 'os-vol-host-attr:host')
error_msg = (("Volume migration error : "
"volume %(volume)s is now on host '%(host)s'.") %
{'volume': volume.id, 'host': host_name})
error_msg = ("Volume migration error : "
f"volume {volume.id} is now on host "
f"'{host_name}'.")
LOG.error(error_msg)
return False
@@ -173,9 +173,8 @@ class CinderHelper:
return False
else:
host_name = getattr(volume, 'os-vol-host-attr:host')
error_msg = (("Volume migration error : "
"volume %(volume)s is now on host '%(host)s'.") %
{'volume': volume.id, 'host': host_name})
error_msg = ("Volume migration error : "
f"volume {volume.id} is now on host '{host_name}'.")
LOG.error(error_msg)
return False
LOG.debug(

View File

@@ -56,9 +56,8 @@ def check_min_nova_api_version(config_version):
"""
min_required = nova_api_versions.APIVersion(MIN_NOVA_API_VERSION)
if nova_api_versions.APIVersion(config_version) < min_required:
raise ValueError('Invalid nova_client.api_version %s. %s or '
'greater is required.' % (config_version,
MIN_NOVA_API_VERSION))
raise ValueError(f'Invalid nova_client.api_version {config_version}. '
f'{MIN_NOVA_API_VERSION} or greater is required.')
class OpenStackClients:
@@ -99,7 +98,7 @@ class OpenStackClients:
return self._session
def _get_client_option(self, client, option):
return getattr(getattr(CONF, '%s_client' % client), option)
return getattr(getattr(CONF, f'{client}_client'), option)
@exception.wrap_keystone_exception
def keystone(self):

View File

@@ -75,7 +75,7 @@ class RequestContext(context.RequestContext):
return values
def __str__(self):
return "<Context %s>" % self.to_dict()
return f"<Context {self.to_dict()}>"
def make_context(*args, **kwargs):

View File

@@ -59,7 +59,7 @@ class BaseMetalNode(abc.ABC):
self.power_off()
else:
raise exception.UnsupportedActionType(
"Cannot set power state: %s" % state)
f"Cannot set power state: {state}")
class BaseMetalHelper(abc.ABC):

View File

@@ -195,8 +195,9 @@ class NovaHelper:
break
time.sleep(poll_interval)
else:
raise Exception("Volume %s did not reach status %s after %d s"
% (volume.id, status, timeout))
raise Exception(
f"Volume {volume.id} did not reach status {status} "
f"after {timeout:d} s")
return volume.status == status
def watcher_non_live_migrate_instance(self, instance_id, dest_hostname,
@@ -432,7 +433,7 @@ class NovaHelper:
else:
raise Exception("Live migration execution and abort both failed "
"for the instance %s" % instance_id)
f"for the instance {instance_id}")
def enable_service_nova_compute(self, hostname):
if (api_versions.APIVersion(version_str=CONF.nova_client.api_version) <
@@ -701,7 +702,7 @@ class NovaHelper:
security_groups=sec_group_list,
nics=net_list,
block_device_mapping_v2=block_device_mapping_v2,
availability_zone="{}:{}".format(azone, node_id))
availability_zone=f"{azone}:{node_id}")
# Poll at 5 second intervals, until the status is no longer 'BUILD'
if instance:

View File

@@ -52,7 +52,7 @@ class PlacementHelper:
"""
url = '/resource_providers'
if rp_name:
url += '?name=%s' % rp_name
url += f'?name={rp_name}'
resp = self.get(url)
if resp.status_code == HTTPStatus.OK:
json_resp = resp.json()
@@ -76,7 +76,7 @@ class PlacementHelper:
:param rp_uuid: UUID of the resource provider to get.
:return: A dictionary of inventories keyed by resource classes.
"""
url = '/resource_providers/%s/inventories' % rp_uuid
url = f'/resource_providers/{rp_uuid}/inventories'
resp = self.get(url)
if resp.status_code == HTTPStatus.OK:
json = resp.json()
@@ -96,7 +96,7 @@ class PlacementHelper:
:param rp_uuid: UUID of the resource provider to grab traits for.
:return: A list of traits.
"""
resp = self.get("/resource_providers/%s/traits" % rp_uuid)
resp = self.get(f"/resource_providers/{rp_uuid}/traits")
if resp.status_code == HTTPStatus.OK:
json = resp.json()
@@ -117,7 +117,7 @@ class PlacementHelper:
:return: A dictionary of allocation records keyed by resource
provider uuid.
"""
url = '/allocations/%s' % consumer_uuid
url = f'/allocations/{consumer_uuid}'
resp = self.get(url)
if resp.status_code == HTTPStatus.OK:
json = resp.json()
@@ -138,7 +138,7 @@ class PlacementHelper:
:return: A dictionary that describes how much each class of
resource is being consumed on this resource provider.
"""
url = '/resource_providers/%s/usages' % rp_uuid
url = f'/resource_providers/{rp_uuid}/usages'
resp = self.get(url)
if resp.status_code == HTTPStatus.OK:
json = resp.json()
@@ -163,7 +163,7 @@ class PlacementHelper:
:returns: A dict, keyed by resource provider UUID, which can
provide the required resources.
"""
url = "/allocation_candidates?%s" % resources
url = f"/allocation_candidates?{resources}"
resp = self.get(url)
if resp.status_code == HTTPStatus.OK:
data = resp.json()

View File

@@ -189,5 +189,5 @@ def async_compat_call(f, *args, **kwargs):
# timeout exceptions to asyncio timeout errors.
with eventlet.timeout.Timeout(
seconds=timeout,
exception=asyncio.TimeoutError("Timeout: %ss" % timeout)):
exception=asyncio.TimeoutError(f"Timeout: {timeout}s")):
return tpool.execute(tpool_wrapper)

View File

@@ -25,16 +25,16 @@ nova_client = cfg.OptGroup(name='nova_client',
NOVA_CLIENT_OPTS = [
cfg.StrOpt('api_version',
default='2.56',
help="""
help=f"""
Version of Nova API to use in novaclient.
Minimum required version: %s
Minimum required version: {clients.MIN_NOVA_API_VERSION}
Certain Watcher features depend on a minimum version of the compute
API being available which is enforced with this option. See
https://docs.openstack.org/nova/latest/reference/api-microversion-history.html
for the compute API microversion history.
""" % clients.MIN_NOVA_API_VERSION),
"""),
cfg.StrOpt('endpoint_type',
default='publicURL',
choices=['public', 'internal', 'admin',

View File

@@ -63,9 +63,9 @@ def _import_modules(module_names):
for modname in module_names:
mod = importlib.import_module("watcher.conf." + modname)
if not hasattr(mod, LIST_OPTS_FUNC_NAME):
msg = "The module 'watcher.conf.%s' should have a '%s' "\
"function which returns the config options." % \
(modname, LIST_OPTS_FUNC_NAME)
msg = (f"The module 'watcher.conf.{modname}' should have a "
f"'{LIST_OPTS_FUNC_NAME}' function which returns the "
"config options.")
raise Exception(msg)
else:
imported_modules.append(mod)

View File

@@ -87,7 +87,7 @@ class WatcherObjectsMap:
out = ""
for key, vals in zip(self.keys(), self.values()):
ids = [val.id for val in vals]
out += "%(key)s: %(val)s" % (dict(key=key, val=ids))
out += "{key}: {val}".format(**dict(key=key, val=ids))
out += "\n"
return out

View File

@@ -197,7 +197,7 @@ class Connection(api.BaseConnection):
if 'deleted' in filters:
deleted_filter = filters.pop('deleted')
op = 'eq' if not bool(deleted_filter) else 'neq'
filters['deleted__%s' % op] = 0
filters[f'deleted__{op}'] = 0
plain_fields = tuple(
(list(plain_fields) or []) +

View File

@@ -57,10 +57,10 @@ class JsonEncodedType(TypeDecorator):
# interface the consistent.
value = self.type()
elif not isinstance(value, self.type):
raise TypeError("%s supposes to store %s objects, but %s given"
% (self.__class__.__name__,
self.type.__name__,
type(value).__name__))
raise TypeError(
f"{self.__class__.__name__} supposes to store "
f"{self.type.__name__} objects, but "
f"{type(value).__name__} given")
serialized_value = jsonutils.dumps(value)
return serialized_value

View File

@@ -80,7 +80,7 @@ class GnocchiHelper(base.DataSourceBase):
resource_id = resource.uuid
if resource_type == 'compute_node':
resource_id = "{}_{}".format(resource.hostname, resource.hostname)
resource_id = resource.hostname + "_" + resource.hostname
kwargs = dict(query={"=": {"original_resource_id": resource_id}},
limit=1)
resources = self.query_retry(
@@ -162,7 +162,7 @@ class GnocchiHelper(base.DataSourceBase):
resource_id = resource.uuid
if resource_type == 'compute_node':
resource_id = "{}_{}".format(resource.hostname, resource.hostname)
resource_id = resource.hostname + "_" + resource.hostname
kwargs = dict(query={"=": {"original_resource_id": resource_id}},
limit=1)
resources = self.query_retry(

View File

@@ -56,19 +56,18 @@ class PrometheusHelper(prometheus_base.PrometheusBase):
"""
def _validate_host_port(host, port):
if len(host) > 255:
return (False, "hostname is too long: '%s'" % host)
return (False, f"hostname is too long: '{host}'")
if host[-1] == '.':
host = host[:-1]
legal_hostname = re.compile(
"(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE)
if not all(legal_hostname.match(host_part)
for host_part in host.split(".")):
return (False, "hostname '%s' failed regex match " % host)
return (False, f"hostname '{host}' failed regex match ")
try:
assert bool(1 <= int(port) <= 65535)
except (AssertionError, ValueError):
return (False, "missing or invalid port number '%s' "
% port)
return (False, f"missing or invalid port number '{port}' ")
return (True, "all good")
_host = CONF.prometheus_client.host
@@ -91,7 +90,7 @@ class PrometheusHelper(prometheus_base.PrometheusBase):
% {'host': _host, 'port': _port, 'reason': reason})
)
the_client = prometheus_client.PrometheusAPIClient(
"{}:{}".format(_host, _port))
f"{_host}:{_port}")
# check if tls options or basic_auth options are set and use them
prometheus_user = CONF.prometheus_client.username

View File

@@ -268,27 +268,25 @@ class PrometheusBase(base.DataSourceBase):
if meter == 'node_cpu_seconds_total':
query_args = (
"100 - (%(agg)s by (%(label)s)(rate(%(meter)s"
"{mode='idle',%(label)s='%(label_value)s'}[%(period)ss])) "
f"100 - ({aggregate} by ({self.prometheus_fqdn_label})"
f"(rate({meter}{{mode='idle',"
f"{self.prometheus_fqdn_label}='{instance_label}'}}"
f"[{period}s])) "
"* 100)"
% {'label': self.prometheus_fqdn_label,
'label_value': instance_label, 'agg': aggregate,
'meter': meter, 'period': period}
)
elif meter == 'node_memory_MemAvailable_bytes':
# Prometheus metric is in B and we need to return KB
query_args = (
"(node_memory_MemTotal_bytes{%(label)s='%(label_value)s'} "
"- %(agg)s_over_time(%(meter)s{%(label)s='%(label_value)s'}"
"[%(period)ss])) / 1024"
% {'label': self.prometheus_fqdn_label,
'label_value': instance_label, 'agg': aggregate,
'meter': meter, 'period': period}
f"(node_memory_MemTotal_bytes{{"
f"{self.prometheus_fqdn_label}='{instance_label}'}} "
f"- {aggregate}_over_time({meter}{{"
f"{self.prometheus_fqdn_label}='{instance_label}'}}"
f"[{period}s])) / 1024"
)
elif meter == 'ceilometer_memory_usage':
query_args = (
"%s_over_time(%s{%s='%s'}[%ss])" %
(aggregate, meter, uuid_label_key, instance_label, period)
f"{aggregate}_over_time({meter}{{"
f"{uuid_label_key}='{instance_label}'}}[{period}s])"
)
elif meter == 'ceilometer_cpu':
# We are converting the total cumulative cpu time (ns) to cpu usage
@@ -304,12 +302,9 @@ class PrometheusBase(base.DataSourceBase):
)
vcpus = 1
query_args = (
"clamp_max((%(agg)s by (%(label)s)"
"(rate(%(meter)s{%(label)s='%(label_value)s'}[%(period)ss]))"
"/10e+8) *(100/%(vcpus)s), 100)"
% {'label': uuid_label_key, 'label_value': instance_label,
'agg': aggregate, 'meter': meter, 'period': period,
'vcpus': vcpus}
f"clamp_max(({aggregate} by ({uuid_label_key})"
f"(rate({meter}{{{uuid_label_key}='{instance_label}'}}"
f"[{period}s]))/10e+8) *(100/{vcpus}), 100)"
)
else:
raise exception.InvalidParameter(

View File

@@ -84,8 +84,7 @@ class StrategyEndpoint:
if not datasource:
state = "Datasource is not presented for this strategy"
else:
state = "{}: {}".format(datasource.NAME,
datasource.check_availability())
state = f"{datasource.NAME}: {datasource.check_availability()}"
return {'type': 'Datasource',
'state': state,
'mandatory': True, 'comment': ''}
@@ -207,7 +206,7 @@ class BaseStrategy(loadable.Loadable, metaclass=abc.ABCMeta):
"datasources",
help="Datasources to use in order to query the needed metrics."
" This option overrides the global preference."
" options: {}".format(datasources_ops),
f" options: {datasources_ops}",
item_type=cfg.types.String(choices=datasources_ops),
default=None)
]

View File

@@ -283,7 +283,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
host_avg_cpu_util = self.get_compute_node_cpu_usage(node)
if host_avg_cpu_util is None:
resource_id = "{}_{}".format(node.uuid, node.hostname)
resource_id = f"{node.uuid}_{node.hostname}"
LOG.error(
"No values returned by %(resource_id)s "
"for %(metric_name)s", dict(

View File

@@ -95,7 +95,7 @@ class DummyWithScorer(base.DummyBaseStrategy):
# Parse the result using workloads from scorer's metainfo
result = self._workloads[jsonutils.loads(result_str)[0]]
LOG.debug('Detected Workload: %s', result)
parameters = {'message': 'Detected Workload: %s' % result}
parameters = {'message': f'Detected Workload: {result}'}
self.solution.add_action(action_type=self.NOP,
input_parameters=parameters)
@@ -105,7 +105,7 @@ class DummyWithScorer(base.DummyBaseStrategy):
LOG.debug('AVG Scorer result: %s', result_str)
result = jsonutils.loads(result_str)[0]
LOG.debug('AVG Scorer result (parsed): %d', result)
parameters = {'message': 'AVG Scorer result: %s' % result}
parameters = {'message': f'AVG Scorer result: {result}'}
self.solution.add_action(action_type=self.NOP,
input_parameters=parameters)

View File

@@ -79,7 +79,7 @@ def use_jsonutils(logical_line, filename):
if "json." in logical_line:
json_funcs = ['dumps(', 'dump(', 'loads(', 'load(']
for f in json_funcs:
pos = logical_line.find('json.%s' % f)
pos = logical_line.find(f'json.{f}')
if pos != -1:
yield (pos, msg % {'fun': f[:-1]})
@@ -97,7 +97,7 @@ def no_translate_debug_logs(logical_line, filename):
N319
"""
for hint in _all_hints:
if logical_line.startswith("LOG.debug(%s(" % hint):
if logical_line.startswith(f"LOG.debug({hint}("):
yield (0, "N319 Don't translate debug level logs")
@@ -178,7 +178,7 @@ def check_assertempty(logical_line, filename):
"assertEqual(observed, *empty*). *empty* contains "
"{}, [], (), set(), '', \"\"")
empties = r"(\[\s*\]|\{\s*\}|\(\s*\)|set\(\s*\)|'\s*'|\"\s*\")"
reg = r"assertEqual\(([^,]*,\s*)+?%s\)\s*$" % empties
reg = rf"assertEqual\(([^,]*,\s*)+?{empties}\)\s*$"
if re.search(reg, logical_line):
yield (0, msg)
@@ -223,14 +223,14 @@ def check_oslo_i18n_wrapper(logical_line, filename, noqa):
split_line = logical_line.split()
modulename = os.path.normpath(filename).split('/')[0]
bad_i18n_module = '%s.i18n' % modulename
bad_i18n_module = f'{modulename}.i18n'
if (len(split_line) > 1 and split_line[0] in ('import', 'from')):
if (split_line[1] == bad_i18n_module or
modulename != 'watcher' and split_line[1] in ('watcher.i18n',
'watcher._i18n')):
msg = ("N340: %(found)s is found. Use %(module)s._i18n instead."
% {'found': split_line[1], 'module': modulename})
msg = (f"N340: {split_line[1]} is found. Use "
f"{modulename}._i18n instead.")
yield (0, msg)
@@ -249,14 +249,14 @@ def check_builtins_gettext(logical_line, tokens, filename, lines, noqa):
modulename = os.path.normpath(filename).split('/')[0]
if '%s/tests' % modulename in filename:
if f'{modulename}/tests' in filename:
return
if os.path.basename(filename) in ('i18n.py', '_i18n.py'):
return
token_values = [t[1] for t in tokens]
i18n_wrapper = '%s._i18n' % modulename
i18n_wrapper = f'{modulename}._i18n'
if '_' in token_values:
i18n_import_line_found = False
@@ -269,7 +269,7 @@ def check_builtins_gettext(logical_line, tokens, filename, lines, noqa):
break
if not i18n_import_line_found:
msg = ("N341: _ from python builtins module is used. "
"Use _ from %s instead." % i18n_wrapper)
f"Use _ from {i18n_wrapper} instead.")
yield (0, msg)

View File

@@ -72,9 +72,9 @@ class EventType(NotificationObject):
def to_notification_event_type_field(self):
"""Serialize the object to the wire format."""
s = '{}.{}'.format(self.object, self.action)
s = f'{self.object}.{self.action}'
if self.obj_attr_is_set('phase'):
s += '.%s' % self.phase
s += f'.{self.phase}'
return s
@@ -192,8 +192,7 @@ class NotificationBase(NotificationObject):
self._emit(
context,
event_type=self.event_type.to_notification_event_type_field(),
publisher_id='{}:{}'.format(self.publisher.binary,
self.publisher.host),
publisher_id=f'{self.publisher.binary}:{self.publisher.host}',
payload=self.payload.obj_to_primitive())

View File

@@ -78,7 +78,7 @@ class FunctionalTest(base.DbTestCase):
hooks.ContextHook(),
hooks.NoExceptionTracebackHook()
],
'template_path': '%s/api/templates' % root_dir,
'template_path': f'{root_dir}/api/templates',
'enable_acl': enable_acl,
'acl_public_routes': ['/', '/v1'],
},
@@ -105,7 +105,7 @@ class FunctionalTest(base.DbTestCase):
"""
full_path = path_prefix + path
response = getattr(self.app, "%s_json" % method)(
response = getattr(self.app, f"{method}_json")(
str(full_path),
params=params,
headers=headers,
@@ -219,7 +219,7 @@ class FunctionalTest(base.DbTestCase):
}
for query in q:
for name in ['field', 'op', 'value']:
query_params['q.%s' % name].append(query.get(name, ''))
query_params[f'q.{name}'].append(query.get(name, ''))
all_params = {}
all_params.update(params)
if q:

View File

@@ -137,9 +137,8 @@ class TestNoExceptionTracebackHook(base.FunctionalTest):
# instead of'\n'.join(trace). But since RemoteError is kind of very
# rare thing (happens due to wrong deserialization settings etc.)
# we don't care about this garbage.
expected_msg = ("Remote error: %s %s"
% (test_exc_type, self.MSG_WITHOUT_TRACE) +
"\n['")
expected_msg = (f"Remote error: {test_exc_type} "
f"{self.MSG_WITHOUT_TRACE}\n['")
actual_msg = jsonutils.loads(
response.json['error_message'])['faultstring']
self.assertEqual(expected_msg, actual_msg)

View File

@@ -34,7 +34,7 @@ class FakeMemcache:
"""Fake cache that is used for keystone tokens lookup."""
_cache = {
'tokens/%s' % ADMIN_TOKEN: {
f'tokens/{ADMIN_TOKEN}': {
'access': {
'token': {'id': ADMIN_TOKEN,
'expires': '2100-09-11T00:00:00'},
@@ -46,7 +46,7 @@ class FakeMemcache:
},
}
},
'tokens/%s' % MEMBER_TOKEN: {
f'tokens/{MEMBER_TOKEN}': {
'access': {
'token': {'id': MEMBER_TOKEN,
'expires': '2100-09-11T00:00:00'},

View File

@@ -116,7 +116,7 @@ class TestListAction(api_base.FunctionalTest):
def test_show(self):
action = obj_utils.create_test_action(self.context, parents=None)
response = self.get_json('/actions/%s' % action['uuid'])
response = self.get_json('/actions/{}'.format(action['uuid']))
self.assertEqual(action.uuid, response['uuid'])
self.assertEqual(action.action_type, response['action_type'])
self.assertEqual(action.input_parameters, response['input_parameters'])
@@ -127,7 +127,7 @@ class TestListAction(api_base.FunctionalTest):
action = obj_utils.create_test_action(
self.context, parents=None, status_message='test')
response = self.get_json(
'/actions/%s' % action['uuid'],
'/actions/{}'.format(action['uuid']),
headers={'OpenStack-API-Version': 'infra-optim 1.5'})
self.assertEqual(action.uuid, response['uuid'])
self.assertEqual(action.action_type, response['action_type'])
@@ -139,7 +139,7 @@ class TestListAction(api_base.FunctionalTest):
action = obj_utils.create_test_action(
self.context, parents=None, status_message='test')
response = self.get_json(
'/actions/%s' % action['uuid'],
'/actions/{}'.format(action['uuid']),
headers={'OpenStack-API-Version': 'infra-optim 1.4'})
self.assertEqual(action.uuid, response['uuid'])
self.assertEqual(action.action_type, response['action_type'])
@@ -150,7 +150,7 @@ class TestListAction(api_base.FunctionalTest):
def test_show_with_empty_status_message(self):
action = obj_utils.create_test_action(self.context, parents=None)
response = self.get_json(
'/actions/%s' % action['uuid'],
'/actions/{}'.format(action['uuid']),
headers={'OpenStack-API-Version': 'infra-optim 1.5'})
self.assertEqual(action.uuid, response['uuid'])
self.assertEqual(action.action_type, response['action_type'])
@@ -161,12 +161,12 @@ class TestListAction(api_base.FunctionalTest):
def test_show_soft_deleted(self):
action = obj_utils.create_test_action(self.context, parents=None)
action.soft_delete()
response = self.get_json('/actions/%s' % action['uuid'],
response = self.get_json('/actions/{}'.format(action['uuid']),
headers={'X-Show-Deleted': 'True'})
self.assertEqual(action.uuid, response['uuid'])
self._assert_action_fields(response)
response = self.get_json('/actions/%s' % action['uuid'],
response = self.get_json('/actions/{}'.format(action['uuid']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -189,7 +189,7 @@ class TestListAction(api_base.FunctionalTest):
def test_show_detail(self):
action = obj_utils.create_test_action(self.context, parents=None)
response = self.get_json('/actions/%s/detail' % action['uuid'],
response = self.get_json('/actions/{}/detail'.format(action['uuid']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -249,7 +249,7 @@ class TestListAction(api_base.FunctionalTest):
action_plan_id=action_plan_2.id,
uuid=utils.generate_uuid())
response = self.get_json('/actions?audit_uuid=%s' % self.audit.uuid)
response = self.get_json(f'/actions?audit_uuid={self.audit.uuid}')
self.assertEqual(len(action_list), len(response['actions']))
for action in response['actions']:
self.assertEqual(action_plan_1.uuid, action['action_plan_uuid'])
@@ -280,13 +280,13 @@ class TestListAction(api_base.FunctionalTest):
uuid=utils.generate_uuid())
response = self.get_json(
'/actions?action_plan_uuid=%s' % action_plan_1.uuid)
f'/actions?action_plan_uuid={action_plan_1.uuid}')
self.assertEqual(len(action_list), len(response['actions']))
for action in response['actions']:
self.assertEqual(action_plan_1.uuid, action['action_plan_uuid'])
response = self.get_json(
'/actions?action_plan_uuid=%s' % action_plan_2.uuid)
f'/actions?action_plan_uuid={action_plan_2.uuid}')
for action in response['actions']:
self.assertEqual(action_plan_2.uuid, action['action_plan_uuid'])
@@ -303,7 +303,7 @@ class TestListAction(api_base.FunctionalTest):
uuid=utils.generate_uuid())
response = self.get_json(
'/actions/detail?action_plan_uuid=%s' % action_plan.uuid)
f'/actions/detail?action_plan_uuid={action_plan.uuid}')
for action in response['actions']:
self.assertEqual(action_plan.uuid, action['action_plan_uuid'])
@@ -320,7 +320,7 @@ class TestListAction(api_base.FunctionalTest):
uuid=utils.generate_uuid())
response = self.get_json(
'/actions/detail?audit_uuid=%s' % self.audit.uuid)
f'/actions/detail?audit_uuid={self.audit.uuid}')
for action in response['actions']:
self.assertEqual(action_plan.uuid, action['action_plan_uuid'])
@@ -329,8 +329,8 @@ class TestListAction(api_base.FunctionalTest):
self.context,
uuid=utils.generate_uuid(),
audit_id=self.audit.id)
url = '/actions?action_plan_uuid={}&audit_uuid={}'.format(
action_plan.uuid, self.audit.uuid)
url = (f'/actions?action_plan_uuid={action_plan.uuid}&'
f'audit_uuid={self.audit.uuid}')
response = self.get_json(url, expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -348,7 +348,7 @@ class TestListAction(api_base.FunctionalTest):
uuid=utils.generate_uuid())
actions_list.append(action)
response = self.get_json('/actions?sort_key=%s' % 'uuid')
response = self.get_json('/actions?sort_key={}'.format('uuid'))
names = [s['uuid'] for s in response['actions']]
self.assertEqual(
@@ -387,11 +387,11 @@ class TestListAction(api_base.FunctionalTest):
self.assertEqual(
sorted(action_plans_uuid_list, reverse=(direction == 'desc')),
action_plan_uuids,
message='Failed on %s direction' % direction)
message=f'Failed on {direction} direction')
def test_list_sort_key_validation(self):
response = self.get_json(
'/actions?sort_key=%s' % 'bad_name',
'/actions?sort_key={}'.format('bad_name'),
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -426,7 +426,7 @@ class TestListAction(api_base.FunctionalTest):
action_plan1.state = objects.action_plan.State.CANCELLED
action_plan1.save()
self.delete('/action_plans/%s' % action_plan1.uuid)
self.delete(f'/action_plans/{action_plan1.uuid}')
response = self.get_json('/actions')
# We deleted the actions from the 1st action plan so we've got 2 left
@@ -496,7 +496,7 @@ class TestListAction(api_base.FunctionalTest):
def test_show_with_links(self):
uuid = utils.generate_uuid()
obj_utils.create_test_action(self.context, id=1, uuid=uuid)
response = self.get_json('/actions/%s' % uuid)
response = self.get_json(f'/actions/{uuid}')
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
@@ -549,12 +549,12 @@ class TestPatchAction(api_base.FunctionalTest):
def test_patch_action_not_allowed_old_microversion(self):
"""Test that action patch is not allowed in older microversions"""
new_state = objects.action.State.SKIPPED
response = self.get_json('/actions/%s' % self.action.uuid)
response = self.get_json(f'/actions/{self.action.uuid}')
self.assertNotEqual(new_state, response['state'])
# Test with API version 1.4 (should fail)
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/state', 'value': new_state, 'op': 'replace'}],
headers={'OpenStack-API-Version': 'infra-optim 1.4'},
expect_errors=True)
@@ -565,12 +565,12 @@ class TestPatchAction(api_base.FunctionalTest):
def test_patch_action_allowed_new_microversion(self):
"""Test that action patch is allowed in microversion 1.5+"""
new_state = objects.action.State.SKIPPED
response = self.get_json('/actions/%s' % self.action.uuid)
response = self.get_json(f'/actions/{self.action.uuid}')
self.assertNotEqual(new_state, response['state'])
# Test with API version 1.5 (should succeed)
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/state', 'value': new_state, 'op': 'replace'}],
headers={'OpenStack-API-Version': 'infra-optim 1.5'})
self.assertEqual('application/json', response.content_type)
@@ -584,7 +584,7 @@ class TestPatchAction(api_base.FunctionalTest):
# Try to transition from PENDING to SUCCEEDED (should fail)
new_state = objects.action.State.SUCCEEDED
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/state', 'value': new_state, 'op': 'replace'}],
headers={'OpenStack-API-Version': 'infra-optim 1.5'},
expect_errors=True)
@@ -604,7 +604,7 @@ class TestPatchAction(api_base.FunctionalTest):
self.action_plan.save()
new_state = objects.action.State.SKIPPED
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/state', 'value': new_state, 'op': 'replace'}],
headers={'OpenStack-API-Version': 'infra-optim 1.5'},
expect_errors=True)
@@ -618,7 +618,7 @@ class TestPatchAction(api_base.FunctionalTest):
"""Test that PENDING to SKIPPED transition is allowed"""
new_state = objects.action.State.SKIPPED
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/state', 'value': new_state, 'op': 'replace'},
{'path': '/status_message', 'value': 'test message',
'op': 'replace'}],
@@ -634,7 +634,7 @@ class TestPatchAction(api_base.FunctionalTest):
"""Test that invalid state values are rejected"""
invalid_state = "INVALID_STATE"
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/state', 'value': invalid_state, 'op': 'replace'}],
headers={'OpenStack-API-Version': 'infra-optim 1.5'},
expect_errors=True)
@@ -645,7 +645,7 @@ class TestPatchAction(api_base.FunctionalTest):
def test_patch_action_remove_status_message_not_allowed(self):
"""Test that remove fields is not allowed"""
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/status_message', 'op': 'remove'}],
headers={'OpenStack-API-Version': 'infra-optim 1.5'},
expect_errors=True)
@@ -658,7 +658,7 @@ class TestPatchAction(api_base.FunctionalTest):
def test_patch_action_status_message_not_allowed(self):
"""Test status_message cannot be patched directly when not SKIPPED"""
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/status_message', 'value': 'test message',
'op': 'replace'}],
headers={'OpenStack-API-Version': 'infra-optim 1.5'},
@@ -673,7 +673,7 @@ class TestPatchAction(api_base.FunctionalTest):
"""Test that status_message cannot be patched directly"""
new_state = objects.action.State.SKIPPED
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/state', 'value': new_state, 'op': 'replace'},
{'path': '/action_plan_id', 'value': 56, 'op': 'replace'}],
headers={'OpenStack-API-Version': 'infra-optim 1.5'},
@@ -690,7 +690,7 @@ class TestPatchAction(api_base.FunctionalTest):
# First transition to SKIPPED state
new_state = objects.action.State.SKIPPED
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/state', 'value': new_state, 'op': 'replace'},
{'path': '/status_message', 'value': 'initial message',
'op': 'replace'}],
@@ -700,7 +700,7 @@ class TestPatchAction(api_base.FunctionalTest):
# Now update status_message while in SKIPPED state
response = self.patch_json(
'/actions/%s' % self.action.uuid,
f'/actions/{self.action.uuid}',
[{'path': '/status_message', 'value': 'updated message',
'op': 'replace'}],
headers={'OpenStack-API-Version': 'infra-optim 1.5'})
@@ -730,7 +730,7 @@ class TestActionPolicyEnforcement(api_base.FunctionalTest):
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
f"Policy doesn't allow {rule} to be performed.",
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
@@ -742,7 +742,7 @@ class TestActionPolicyEnforcement(api_base.FunctionalTest):
action = obj_utils.create_test_action(self.context)
self._common_policy_check(
"action:get", self.get_json,
'/actions/%s' % action.uuid,
f'/actions/{action.uuid}',
expect_errors=True)
def test_policy_disallow_detail(self):
@@ -755,7 +755,7 @@ class TestActionPolicyEnforcement(api_base.FunctionalTest):
action = obj_utils.create_test_action(self.context)
self._common_policy_check(
"action:update", self.patch_json,
'/actions/%s' % action.uuid,
f'/actions/{action.uuid}',
[{'path': '/state', 'value': objects.action.State.SKIPPED,
'op': 'replace'}],
headers={'OpenStack-API-Version': 'infra-optim 1.5'},

View File

@@ -79,7 +79,8 @@ class TestListActionPlan(api_base.FunctionalTest):
action_plan = obj_utils.create_test_action_plan(self.context)
obj_utils.create_test_efficacy_indicator(
self.context, action_plan_id=action_plan['id'])
response = self.get_json('/action_plans/%s' % action_plan['uuid'])
response = self.get_json(
'/action_plans/{}'.format(action_plan['uuid']))
self.assertEqual(action_plan.uuid, response['uuid'])
self._assert_action_plans_fields(response)
self.assertEqual(
@@ -95,7 +96,7 @@ class TestListActionPlan(api_base.FunctionalTest):
obj_utils.create_test_efficacy_indicator(
self.context, action_plan_id=action_plan['id'])
response = self.get_json(
'/action_plans/%s' % action_plan['uuid'],
'/action_plans/{}'.format(action_plan['uuid']),
headers={'OpenStack-API-Version': 'infra-optim 1.5'})
self.assertEqual(action_plan.uuid, response['uuid'])
self._assert_action_plans_fields(response)
@@ -106,7 +107,7 @@ class TestListActionPlan(api_base.FunctionalTest):
obj_utils.create_test_efficacy_indicator(
self.context, action_plan_id=action_plan['id'])
response = self.get_json(
'/action_plans/%s' % action_plan['uuid'],
'/action_plans/{}'.format(action_plan['uuid']),
headers={'OpenStack-API-Version': 'infra-optim 1.5'})
self.assertEqual(action_plan.uuid, response['uuid'])
self._assert_action_plans_fields(response)
@@ -115,13 +116,15 @@ class TestListActionPlan(api_base.FunctionalTest):
def test_get_one_soft_deleted(self):
action_plan = obj_utils.create_test_action_plan(self.context)
action_plan.soft_delete()
response = self.get_json('/action_plans/%s' % action_plan['uuid'],
headers={'X-Show-Deleted': 'True'})
response = self.get_json(
'/action_plans/{}'.format(action_plan['uuid']),
headers={'X-Show-Deleted': 'True'})
self.assertEqual(action_plan.uuid, response['uuid'])
self._assert_action_plans_fields(response)
response = self.get_json('/action_plans/%s' % action_plan['uuid'],
expect_errors=True)
response = self.get_json(
'/action_plans/{}'.format(action_plan['uuid']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
def test_detail(self):
@@ -170,7 +173,7 @@ class TestListActionPlan(api_base.FunctionalTest):
def test_detail_against_single(self):
action_plan = obj_utils.create_test_action_plan(self.context)
response = self.get_json(
'/action_plan/%s/detail' % action_plan['uuid'],
'/action_plan/{}/detail'.format(action_plan['uuid']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -208,10 +211,10 @@ class TestListActionPlan(api_base.FunctionalTest):
new_state = objects.audit.State.CANCELLED
self.patch_json(
'/audits/%s' % audit1.uuid,
f'/audits/{audit1.uuid}',
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
self.delete('/audits/%s' % audit1.uuid)
self.delete(f'/audits/{audit1.uuid}')
response = self.get_json('/action_plans')
@@ -261,7 +264,7 @@ class TestListActionPlan(api_base.FunctionalTest):
audit_id=audit2.id)
action_plan_list2.append(action_plan.uuid)
response = self.get_json('/action_plans?audit_uuid=%s' % audit2.uuid)
response = self.get_json(f'/action_plans?audit_uuid={audit2.uuid}')
self.assertEqual(len(action_plan_list2), len(response['action_plans']))
for action in response['action_plans']:
self.assertEqual(audit2.uuid, action['audit_uuid'])
@@ -317,14 +320,14 @@ class TestListActionPlan(api_base.FunctionalTest):
def test_sort_key_validation(self):
response = self.get_json(
'/action_plans?sort_key=%s' % 'bad_name',
'/action_plans?sort_key={}'.format('bad_name'),
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
def test_links(self):
uuid = utils.generate_uuid()
obj_utils.create_test_action_plan(self.context, id=1, uuid=uuid)
response = self.get_json('/action_plans/%s' % uuid)
response = self.get_json(f'/action_plans/{uuid}')
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
@@ -375,15 +378,15 @@ class TestDelete(api_base.FunctionalTest):
action_plan.destroy()
def test_delete_action_plan_without_action(self):
response = self.delete('/action_plans/%s' % self.action_plan.uuid,
response = self.delete(f'/action_plans/{self.action_plan.uuid}',
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
self.action_plan.state = objects.action_plan.State.SUCCEEDED
self.action_plan.save()
self.delete('/action_plans/%s' % self.action_plan.uuid)
response = self.get_json('/action_plans/%s' % self.action_plan.uuid,
self.delete(f'/action_plans/{self.action_plan.uuid}')
response = self.get_json(f'/action_plans/{self.action_plan.uuid}',
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
@@ -395,13 +398,13 @@ class TestDelete(api_base.FunctionalTest):
self.action_plan.state = objects.action_plan.State.SUCCEEDED
self.action_plan.save()
self.delete('/action_plans/%s' % self.action_plan.uuid)
ap_response = self.get_json('/action_plans/%s' % self.action_plan.uuid,
self.delete(f'/action_plans/{self.action_plan.uuid}')
ap_response = self.get_json(f'/action_plans/{self.action_plan.uuid}',
expect_errors=True)
acts_response = self.get_json(
'/actions/?action_plan_uuid=%s' % self.action_plan.uuid)
f'/actions/?action_plan_uuid={self.action_plan.uuid}')
act_response = self.get_json(
'/actions/%s' % action.uuid,
f'/actions/{action.uuid}',
expect_errors=True)
# The action plan does not exist anymore
@@ -417,7 +420,7 @@ class TestDelete(api_base.FunctionalTest):
def test_delete_action_plan_not_found(self):
uuid = utils.generate_uuid()
response = self.delete('/action_plans/%s' % uuid, expect_errors=True)
response = self.delete(f'/action_plans/{uuid}', expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
@@ -446,8 +449,8 @@ class TestStart(api_base.FunctionalTest):
def test_start_action_plan_not_found(self, mock_policy):
mock_policy.return_value = True
uuid = utils.generate_uuid()
response = self.post('/v1/action_plans/%s/%s' %
(uuid, 'start'), expect_errors=True)
response = self.post(
'/v1/action_plans/{}/{}'.format(uuid, 'start'), expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
@@ -458,12 +461,12 @@ class TestStart(api_base.FunctionalTest):
action = obj_utils.create_test_action(
self.context, id=1)
self.action_plan.state = objects.action_plan.State.SUCCEEDED
response = self.post('/v1/action_plans/%s/%s/'
% (self.action_plan.uuid, 'start'),
expect_errors=True)
response = self.post(
'/v1/action_plans/{}/{}/'.format(self.action_plan.uuid, 'start'),
expect_errors=True)
self.assertEqual(HTTPStatus.OK, response.status_int)
act_response = self.get_json(
'/actions/%s' % action.uuid,
f'/actions/{action.uuid}',
expect_errors=True)
self.assertEqual(HTTPStatus.OK, act_response.status_int)
self.assertEqual('PENDING', act_response.json['state'])
@@ -496,11 +499,11 @@ class TestPatch(api_base.FunctionalTest):
new_state = objects.action_plan.State.DELETED
response = self.get_json(
'/action_plans/%s' % self.action_plan.uuid)
f'/action_plans/{self.action_plan.uuid}')
self.assertNotEqual(new_state, response['state'])
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
f'/action_plans/{self.action_plan.uuid}',
[{'path': '/state', 'value': new_state, 'op': 'replace'}],
expect_errors=True)
@@ -510,7 +513,7 @@ class TestPatch(api_base.FunctionalTest):
def test_replace_non_existent_action_plan_denied(self):
response = self.patch_json(
'/action_plans/%s' % utils.generate_uuid(),
f'/action_plans/{utils.generate_uuid()}',
[{'path': '/state',
'value': objects.action_plan.State.PENDING,
'op': 'replace'}],
@@ -521,7 +524,7 @@ class TestPatch(api_base.FunctionalTest):
def test_add_non_existent_property_denied(self):
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
f'/action_plans/{self.action_plan.uuid}',
[{'path': '/foo', 'value': 'bar', 'op': 'add'}],
expect_errors=True)
self.assertEqual('application/json', response.content_type)
@@ -531,11 +534,11 @@ class TestPatch(api_base.FunctionalTest):
def test_remove_denied(self):
# We should not be able to remove the state of an action plan
response = self.get_json(
'/action_plans/%s' % self.action_plan.uuid)
f'/action_plans/{self.action_plan.uuid}')
self.assertIsNotNone(response['state'])
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
f'/action_plans/{self.action_plan.uuid}',
[{'path': '/state', 'op': 'remove'}],
expect_errors=True)
@@ -545,7 +548,7 @@ class TestPatch(api_base.FunctionalTest):
def test_remove_uuid_denied(self):
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
f'/action_plans/{self.action_plan.uuid}',
[{'path': '/uuid', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -554,7 +557,7 @@ class TestPatch(api_base.FunctionalTest):
def test_remove_non_existent_property_denied(self):
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
f'/action_plans/{self.action_plan.uuid}',
[{'path': '/non-existent', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code)
@@ -565,10 +568,10 @@ class TestPatch(api_base.FunctionalTest):
def test_replace_state_pending_ok(self, applier_mock):
new_state = objects.action_plan.State.PENDING
response = self.get_json(
'/action_plans/%s' % self.action_plan.uuid)
f'/action_plans/{self.action_plan.uuid}')
self.assertNotEqual(new_state, response['state'])
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
f'/action_plans/{self.action_plan.uuid}',
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
@@ -578,7 +581,7 @@ class TestPatch(api_base.FunctionalTest):
def test_replace_status_message_denied(self):
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
f'/action_plans/{self.action_plan.uuid}',
[{'path': '/status_message', 'value': 'test', 'op': 'replace'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code)
@@ -587,7 +590,7 @@ class TestPatch(api_base.FunctionalTest):
def test_add_status_message_denied(self):
response = self.patch_json(
'/action_plans/%s' % self.action_plan.uuid,
f'/action_plans/{self.action_plan.uuid}',
[{'path': '/status_message', 'value': 'test', 'op': 'add'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code)
@@ -616,7 +619,7 @@ class TestPatchStateTransitionDenied(api_base.FunctionalTest):
scenarios = [
(
"{} -> {}".format(original_state, new_state),
f"{original_state} -> {new_state}",
{"original_state": original_state,
"new_state": new_state},
)
@@ -647,13 +650,13 @@ class TestPatchStateTransitionDenied(api_base.FunctionalTest):
action_plan = obj_utils.create_test_action_plan(
self.context, state=self.original_state)
initial_ap = self.get_json('/action_plans/%s' % action_plan.uuid)
initial_ap = self.get_json(f'/action_plans/{action_plan.uuid}')
response = self.patch_json(
'/action_plans/%s' % action_plan.uuid,
f'/action_plans/{action_plan.uuid}',
[{'path': '/state', 'value': self.new_state,
'op': 'replace'}],
expect_errors=True)
updated_ap = self.get_json('/action_plans/%s' % action_plan.uuid)
updated_ap = self.get_json(f'/action_plans/{action_plan.uuid}')
self.assertNotEqual(self.new_state, initial_ap['state'])
self.assertEqual(self.original_state, updated_ap['state'])
@@ -687,12 +690,12 @@ class TestPatchStateTransitionOk(api_base.FunctionalTest):
action_plan = obj_utils.create_test_action_plan(
self.context, state=self.original_state)
initial_ap = self.get_json('/action_plans/%s' % action_plan.uuid)
initial_ap = self.get_json(f'/action_plans/{action_plan.uuid}')
response = self.patch_json(
'/action_plans/%s' % action_plan.uuid,
f'/action_plans/{action_plan.uuid}',
[{'path': '/state', 'value': self.new_state, 'op': 'replace'}])
updated_ap = self.get_json('/action_plans/%s' % action_plan.uuid)
updated_ap = self.get_json(f'/action_plans/{action_plan.uuid}')
self.assertNotEqual(self.new_state, initial_ap['state'])
self.assertEqual(self.new_state, updated_ap['state'])
self.assertEqual('application/json', response.content_type)
@@ -716,7 +719,7 @@ class TestActionPlanPolicyEnforcement(api_base.FunctionalTest):
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
f"Policy doesn't allow {rule} to be performed.",
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
@@ -728,7 +731,7 @@ class TestActionPlanPolicyEnforcement(api_base.FunctionalTest):
action_plan = obj_utils.create_test_action_plan(self.context)
self._common_policy_check(
"action_plan:get", self.get_json,
'/action_plans/%s' % action_plan.uuid,
f'/action_plans/{action_plan.uuid}',
expect_errors=True)
def test_policy_disallow_detail(self):
@@ -741,7 +744,7 @@ class TestActionPlanPolicyEnforcement(api_base.FunctionalTest):
action_plan = obj_utils.create_test_action_plan(self.context)
self._common_policy_check(
"action_plan:update", self.patch_json,
'/action_plans/%s' % action_plan.uuid,
f'/action_plans/{action_plan.uuid}',
[{'path': '/state',
'value': objects.action_plan.State.DELETED,
'op': 'replace'}],
@@ -751,7 +754,7 @@ class TestActionPlanPolicyEnforcement(api_base.FunctionalTest):
action_plan = obj_utils.create_test_action_plan(self.context)
self._common_policy_check(
"action_plan:delete", self.delete,
'/action_plans/%s' % action_plan.uuid, expect_errors=True)
f'/action_plans/{action_plan.uuid}', expect_errors=True)
class TestActionPlanPolicyEnforcementWithAdminContext(TestListActionPlan,

View File

@@ -104,14 +104,14 @@ class TestListAuditTemplate(FunctionalTestWithSetup):
def test_get_one_by_uuid(self):
audit_template = obj_utils.create_test_audit_template(self.context)
response = self.get_json(
'/audit_templates/%s' % audit_template['uuid'])
'/audit_templates/{}'.format(audit_template['uuid']))
self.assertEqual(audit_template.uuid, response['uuid'])
self._assert_audit_template_fields(response)
def test_get_one_by_name(self):
audit_template = obj_utils.create_test_audit_template(self.context)
response = self.get_json(urlparse.quote(
'/audit_templates/%s' % audit_template['name']))
'/audit_templates/{}'.format(audit_template['name'])))
self.assertEqual(audit_template.uuid, response['uuid'])
self._assert_audit_template_fields(response)
@@ -119,13 +119,13 @@ class TestListAuditTemplate(FunctionalTestWithSetup):
audit_template = obj_utils.create_test_audit_template(self.context)
audit_template.soft_delete()
response = self.get_json(
'/audit_templates/%s' % audit_template['uuid'],
'/audit_templates/{}'.format(audit_template['uuid']),
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit_template.uuid, response['uuid'])
self._assert_audit_template_fields(response)
response = self.get_json(
'/audit_templates/%s' % audit_template['uuid'],
'/audit_templates/{}'.format(audit_template['uuid']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -151,7 +151,7 @@ class TestListAuditTemplate(FunctionalTestWithSetup):
def test_detail_against_single(self):
audit_template = obj_utils.create_test_audit_template(self.context)
response = self.get_json(
'/audit_templates/%s/detail' % audit_template['uuid'],
'/audit_templates/{}/detail'.format(audit_template['uuid']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -212,7 +212,7 @@ class TestListAuditTemplate(FunctionalTestWithSetup):
def test_links(self):
uuid = utils.generate_uuid()
obj_utils.create_test_audit_template(self.context, id=1, uuid=uuid)
response = self.get_json('/audit_templates/%s' % uuid)
response = self.get_json(f'/audit_templates/{uuid}')
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
@@ -254,7 +254,7 @@ class TestListAuditTemplate(FunctionalTestWithSetup):
goal_id=goal_id)
response = self.get_json(
'/audit_templates?goal=%s' % self.fake_goal2.uuid)
f'/audit_templates?goal={self.fake_goal2.uuid}')
self.assertEqual(2, len(response['audit_templates']))
def test_filter_by_goal_name(self):
@@ -267,7 +267,7 @@ class TestListAuditTemplate(FunctionalTestWithSetup):
goal_id=goal_id)
response = self.get_json(
'/audit_templates?goal=%s' % self.fake_goal2.name)
f'/audit_templates?goal={self.fake_goal2.name}')
self.assertEqual(2, len(response['audit_templates']))
def test_filter_by_strategy_uuid(self):
@@ -280,7 +280,7 @@ class TestListAuditTemplate(FunctionalTestWithSetup):
strategy_id=strategy_id)
response = self.get_json(
'/audit_templates?strategy=%s' % self.fake_strategy2.uuid)
f'/audit_templates?strategy={self.fake_strategy2.uuid}')
self.assertEqual(2, len(response['audit_templates']))
def test_filter_by_strategy_name(self):
@@ -293,7 +293,7 @@ class TestListAuditTemplate(FunctionalTestWithSetup):
strategy_id=strategy_id)
response = self.get_json(
'/audit_templates?strategy=%s' % self.fake_strategy2.name)
f'/audit_templates?strategy={self.fake_strategy2.name}')
self.assertEqual(2, len(response['audit_templates']))
def test_many_with_sort_key_name(self):
@@ -304,7 +304,7 @@ class TestListAuditTemplate(FunctionalTestWithSetup):
name=f'My Audit Template {id_}')
audit_template_list.append(audit_template)
response = self.get_json('/audit_templates?sort_key=%s' % 'name')
response = self.get_json('/audit_templates?sort_key={}'.format('name'))
names = [s['name'] for s in response['audit_templates']]
@@ -336,7 +336,7 @@ class TestListAuditTemplate(FunctionalTestWithSetup):
def test_sort_key_validation(self):
response = self.get_json(
'/audit_templates?sort_key=%s' % 'goal_bad_name',
'/audit_templates?sort_key={}'.format('goal_bad_name'),
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -356,18 +356,18 @@ class TestPatch(FunctionalTestWithSetup):
new_goal_uuid = self.fake_goal2.uuid
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
f'/audit_templates/{self.audit_template.uuid}')
self.assertNotEqual(new_goal_uuid, response['goal_uuid'])
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/goal', 'value': new_goal_uuid,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_code)
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
f'/audit_templates/{self.audit_template.uuid}')
self.assertEqual(new_goal_uuid, response['goal_uuid'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
@@ -380,18 +380,18 @@ class TestPatch(FunctionalTestWithSetup):
new_goal_uuid = self.fake_goal2.uuid
response = self.get_json(urlparse.quote(
'/audit_templates/%s' % self.audit_template.name))
f'/audit_templates/{self.audit_template.name}'))
self.assertNotEqual(new_goal_uuid, response['goal_uuid'])
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.name,
f'/audit_templates/{self.audit_template.name}',
[{'path': '/goal', 'value': new_goal_uuid,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_code)
response = self.get_json(
'/audit_templates/%s' % self.audit_template.name)
f'/audit_templates/{self.audit_template.name}')
self.assertEqual(new_goal_uuid, response['goal_uuid'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
@@ -399,7 +399,7 @@ class TestPatch(FunctionalTestWithSetup):
def test_replace_non_existent_audit_template(self):
response = self.patch_json(
'/audit_templates/%s' % utils.generate_uuid(),
f'/audit_templates/{utils.generate_uuid()}',
[{'path': '/goal', 'value': self.fake_goal1.uuid,
'op': 'replace'}],
expect_errors=True)
@@ -414,7 +414,7 @@ class TestPatch(FunctionalTestWithSetup):
wraps=self.dbapi.update_audit_template
) as cn_mock:
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/goal', 'value': utils.generate_uuid(),
'op': 'replace'}],
expect_errors=True)
@@ -423,7 +423,7 @@ class TestPatch(FunctionalTestWithSetup):
def test_add_goal_uuid(self):
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/goal',
'value': self.fake_goal2.uuid,
'op': 'add'}])
@@ -431,12 +431,12 @@ class TestPatch(FunctionalTestWithSetup):
self.assertEqual(HTTPStatus.OK, response.status_int)
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
f'/audit_templates/{self.audit_template.uuid}')
self.assertEqual(self.fake_goal2.uuid, response['goal_uuid'])
def test_add_strategy_uuid(self):
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/strategy',
'value': self.fake_strategy1.uuid,
'op': 'add'}])
@@ -444,12 +444,12 @@ class TestPatch(FunctionalTestWithSetup):
self.assertEqual(HTTPStatus.OK, response.status_int)
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
f'/audit_templates/{self.audit_template.uuid}')
self.assertEqual(self.fake_strategy1.uuid, response['strategy_uuid'])
def test_replace_strategy_uuid(self):
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/strategy',
'value': self.fake_strategy2['uuid'],
'op': 'replace'}])
@@ -457,13 +457,13 @@ class TestPatch(FunctionalTestWithSetup):
self.assertEqual(HTTPStatus.OK, response.status_int)
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
f'/audit_templates/{self.audit_template.uuid}')
self.assertEqual(
self.fake_strategy2['uuid'], response['strategy_uuid'])
def test_replace_invalid_strategy(self):
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/strategy',
'value': utils.generate_uuid(), # Does not exist
'op': 'replace'}], expect_errors=True)
@@ -473,7 +473,7 @@ class TestPatch(FunctionalTestWithSetup):
def test_add_non_existent_property(self):
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/foo', 'value': 'bar', 'op': 'add'}],
expect_errors=True)
self.assertEqual('application/json', response.content_type)
@@ -483,26 +483,26 @@ class TestPatch(FunctionalTestWithSetup):
def test_remove_strategy(self):
audit_template = obj_utils.create_test_audit_template(
self.context, uuid=utils.generate_uuid(),
name="AT_%s" % utils.generate_uuid(),
name=f"AT_{utils.generate_uuid()}",
goal_id=self.fake_goal1.id,
strategy_id=self.fake_strategy1.id)
response = self.get_json(
'/audit_templates/%s' % audit_template.uuid)
f'/audit_templates/{audit_template.uuid}')
self.assertIsNotNone(response['strategy_uuid'])
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/strategy', 'op': 'remove'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_code)
def test_remove_goal(self):
response = self.get_json(
'/audit_templates/%s' % self.audit_template.uuid)
f'/audit_templates/{self.audit_template.uuid}')
self.assertIsNotNone(response['goal_uuid'])
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/goal', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code)
@@ -511,7 +511,7 @@ class TestPatch(FunctionalTestWithSetup):
def test_remove_uuid(self):
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/uuid', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -520,7 +520,7 @@ class TestPatch(FunctionalTestWithSetup):
def test_remove_non_existent_property(self):
response = self.patch_json(
'/audit_templates/%s' % self.audit_template.uuid,
f'/audit_templates/{self.audit_template.uuid}',
[{'path': '/non-existent', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code)
@@ -544,7 +544,7 @@ class TestPost(FunctionalTestWithSetup):
# Check location header
self.assertIsNotNone(response.location)
expected_location = \
'/v1/audit_templates/%s' % response.json['uuid']
'/v1/audit_templates/{}'.format(response.json['uuid'])
self.assertEqual(urlparse.urlparse(response.location).path,
expected_location)
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@@ -571,7 +571,7 @@ class TestPost(FunctionalTestWithSetup):
# Check location header
self.assertIsNotNone(response.location)
expected_location = \
'/v1/audit_templates/%s' % response.json['uuid']
'/v1/audit_templates/{}'.format(response.json['uuid'])
self.assertEqual(urlparse.urlparse(response.location).path,
expected_location)
self.assertTrue(utils.is_uuid_like(response.json['uuid']))
@@ -728,10 +728,10 @@ class TestDelete(api_base.FunctionalTest):
def test_delete_audit_template_by_uuid(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
self.delete(urlparse.quote('/audit_templates/%s' %
self.audit_template.uuid))
self.delete(urlparse.quote(
f'/audit_templates/{self.audit_template.uuid}'))
response = self.get_json(
urlparse.quote('/audit_templates/%s' % self.audit_template.uuid),
urlparse.quote(f'/audit_templates/{self.audit_template.uuid}'),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
@@ -751,10 +751,10 @@ class TestDelete(api_base.FunctionalTest):
def test_delete_audit_template_by_name(self, mock_utcnow):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
self.delete(urlparse.quote('/audit_templates/%s' %
self.audit_template.name))
self.delete(urlparse.quote(
f'/audit_templates/{self.audit_template.name}'))
response = self.get_json(
urlparse.quote('/audit_templates/%s' % self.audit_template.name),
urlparse.quote(f'/audit_templates/{self.audit_template.name}'),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
@@ -773,7 +773,7 @@ class TestDelete(api_base.FunctionalTest):
def test_delete_audit_template_not_found(self):
uuid = utils.generate_uuid()
response = self.delete(
'/audit_templates/%s' % uuid, expect_errors=True)
f'/audit_templates/{uuid}', expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
@@ -790,7 +790,7 @@ class TestAuditTemplatePolicyEnforcement(api_base.FunctionalTest):
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
f"Policy doesn't allow {rule} to be performed.",
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
@@ -803,7 +803,7 @@ class TestAuditTemplatePolicyEnforcement(api_base.FunctionalTest):
audit_template = obj_utils.create_test_audit_template(self.context)
self._common_policy_check(
"audit_template:get", self.get_json,
'/audit_templates/%s' % audit_template.uuid,
f'/audit_templates/{audit_template.uuid}',
expect_errors=True)
def test_policy_disallow_detail(self):
@@ -817,7 +817,7 @@ class TestAuditTemplatePolicyEnforcement(api_base.FunctionalTest):
audit_template = obj_utils.create_test_audit_template(self.context)
self._common_policy_check(
"audit_template:update", self.patch_json,
'/audit_templates/%s' % audit_template.uuid,
f'/audit_templates/{audit_template.uuid}',
[{'path': '/state', 'value': objects.audit.State.SUCCEEDED,
'op': 'replace'}], expect_errors=True)
@@ -842,7 +842,7 @@ class TestAuditTemplatePolicyEnforcement(api_base.FunctionalTest):
audit_template = obj_utils.create_test_audit_template(self.context)
self._common_policy_check(
"audit_template:delete", self.delete,
'/audit_templates/%s' % audit_template.uuid, expect_errors=True)
f'/audit_templates/{audit_template.uuid}', expect_errors=True)
class TestAuditTemplatePolicyWithAdminContext(TestListAuditTemplate,

View File

@@ -128,7 +128,7 @@ class TestListAudit(api_base.FunctionalTest):
def test_get_one(self):
audit = obj_utils.create_test_audit(self.context)
response = self.get_json('/audits/%s' % audit['uuid'])
response = self.get_json('/audits/{}'.format(audit['uuid']))
self.assertEqual(audit.uuid, response['uuid'])
self._assert_audit_fields(response)
self.assertNotIn('status_message', response)
@@ -137,7 +137,7 @@ class TestListAudit(api_base.FunctionalTest):
audit = obj_utils.create_test_audit(
self.context, status_message='Fake message')
response = self.get_json(
'/audits/%s' % audit['uuid'],
'/audits/{}'.format(audit['uuid']),
headers={'OpenStack-API-Version': 'infra-optim 1.5'})
self.assertEqual(audit.uuid, response['uuid'])
self._assert_audit_fields(response)
@@ -147,7 +147,7 @@ class TestListAudit(api_base.FunctionalTest):
audit = obj_utils.create_test_audit(
self.context, status_message='Fake message')
response = self.get_json(
'/audits/%s' % audit['uuid'],
'/audits/{}'.format(audit['uuid']),
headers={'OpenStack-API-Version': 'infra-optim 1.4'})
self.assertEqual(audit.uuid, response['uuid'])
self._assert_audit_fields(response)
@@ -157,7 +157,7 @@ class TestListAudit(api_base.FunctionalTest):
audit = obj_utils.create_test_audit(
self.context)
response = self.get_json(
'/audits/%s' % audit['uuid'],
'/audits/{}'.format(audit['uuid']),
headers={'OpenStack-API-Version': 'infra-optim 1.5'})
self.assertEqual(audit.uuid, response['uuid'])
self.assertIsNone(response['status_message'])
@@ -165,12 +165,12 @@ class TestListAudit(api_base.FunctionalTest):
def test_get_one_soft_deleted(self):
audit = obj_utils.create_test_audit(self.context)
audit.soft_delete()
response = self.get_json('/audits/%s' % audit['uuid'],
response = self.get_json('/audits/{}'.format(audit['uuid']),
headers={'X-Show-Deleted': 'True'})
self.assertEqual(audit.uuid, response['uuid'])
self._assert_audit_fields(response)
response = self.get_json('/audits/%s' % audit['uuid'],
response = self.get_json('/audits/{}'.format(audit['uuid']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -205,7 +205,7 @@ class TestListAudit(api_base.FunctionalTest):
def test_detail_against_single(self):
audit = obj_utils.create_test_audit(self.context)
response = self.get_json('/audits/%s/detail' % audit['uuid'],
response = self.get_json('/audits/{}/detail'.format(audit['uuid']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -277,7 +277,7 @@ class TestListAudit(api_base.FunctionalTest):
def test_sort_key_validation(self):
response = self.get_json(
'/audits?sort_key=%s' % 'bad_name',
'/audits?sort_key={}'.format('bad_name'),
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -286,7 +286,7 @@ class TestListAudit(api_base.FunctionalTest):
obj_utils.create_test_audit(
self.context, id=1, uuid=uuid,
name=f'My Audit {1}')
response = self.get_json('/audits/%s' % uuid)
response = self.get_json(f'/audits/{uuid}')
self.assertIn('links', response.keys())
self.assertEqual(2, len(response['links']))
self.assertIn(uuid, response['links'][0]['href'])
@@ -342,17 +342,17 @@ class TestPatch(api_base.FunctionalTest):
mock_utcnow.return_value = test_time
new_state = objects.audit.State.CANCELLED
response = self.get_json('/audits/%s' % self.audit.uuid)
response = self.get_json(f'/audits/{self.audit.uuid}')
self.assertNotEqual(new_state, response['state'])
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
f'/audits/{self.audit.uuid}',
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_code)
response = self.get_json('/audits/%s' % self.audit.uuid)
response = self.get_json(f'/audits/{self.audit.uuid}')
self.assertEqual(new_state, response['state'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
@@ -360,7 +360,7 @@ class TestPatch(api_base.FunctionalTest):
def test_replace_non_existent_audit(self):
response = self.patch_json(
'/audits/%s' % utils.generate_uuid(),
f'/audits/{utils.generate_uuid()}',
[{'path': '/state', 'value': objects.audit.State.SUCCEEDED,
'op': 'replace'}], expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -369,7 +369,7 @@ class TestPatch(api_base.FunctionalTest):
def test_replace_status_message_denied(self):
response = self.patch_json(
'/audits/%s' % utils.generate_uuid(),
f'/audits/{utils.generate_uuid()}',
[{'path': '/status_message', 'value': 'test', 'op': 'replace'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code)
@@ -379,17 +379,17 @@ class TestPatch(api_base.FunctionalTest):
def test_add_ok(self):
new_state = objects.audit.State.SUCCEEDED
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
f'/audits/{self.audit.uuid}',
[{'path': '/state', 'value': new_state, 'op': 'add'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_int)
response = self.get_json('/audits/%s' % self.audit.uuid)
response = self.get_json(f'/audits/{self.audit.uuid}')
self.assertEqual(new_state, response['state'])
def test_add_non_existent_property(self):
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
f'/audits/{self.audit.uuid}',
[{'path': '/foo', 'value': 'bar', 'op': 'add'}],
expect_errors=True)
self.assertEqual('application/json', response.content_type)
@@ -397,19 +397,19 @@ class TestPatch(api_base.FunctionalTest):
self.assertTrue(response.json['error_message'])
def test_remove_ok(self):
response = self.get_json('/audits/%s' % self.audit.uuid)
response = self.get_json(f'/audits/{self.audit.uuid}')
self.assertIsNotNone(response['interval'])
response = self.patch_json('/audits/%s' % self.audit.uuid,
response = self.patch_json(f'/audits/{self.audit.uuid}',
[{'path': '/interval', 'op': 'remove'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_code)
response = self.get_json('/audits/%s' % self.audit.uuid)
response = self.get_json(f'/audits/{self.audit.uuid}')
self.assertIsNone(response['interval'])
def test_remove_uuid(self):
response = self.patch_json('/audits/%s' % self.audit.uuid,
response = self.patch_json(f'/audits/{self.audit.uuid}',
[{'path': '/uuid', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -418,7 +418,7 @@ class TestPatch(api_base.FunctionalTest):
def test_remove_non_existent_property(self):
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
f'/audits/{self.audit.uuid}',
[{'path': '/non-existent', 'op': 'remove'}],
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code)
@@ -442,7 +442,7 @@ class TestPatchStateTransitionDenied(api_base.FunctionalTest):
scenarios = [
(
"{} -> {}".format(original_state, new_state),
f"{original_state} -> {new_state}",
{"original_state": original_state,
"new_state": new_state},
)
@@ -470,11 +470,11 @@ class TestPatchStateTransitionDenied(api_base.FunctionalTest):
return audit
def test_replace_denied(self):
response = self.get_json('/audits/%s' % self.audit.uuid)
response = self.get_json(f'/audits/{self.audit.uuid}')
self.assertNotEqual(self.new_state, response['state'])
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
f'/audits/{self.audit.uuid}',
[{'path': '/state', 'value': self.new_state,
'op': 'replace'}],
expect_errors=True)
@@ -482,7 +482,7 @@ class TestPatchStateTransitionDenied(api_base.FunctionalTest):
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code)
self.assertTrue(response.json['error_message'])
response = self.get_json('/audits/%s' % self.audit.uuid)
response = self.get_json(f'/audits/{self.audit.uuid}')
self.assertEqual(self.original_state, response['state'])
@@ -518,17 +518,17 @@ class TestPatchStateTransitionOk(api_base.FunctionalTest):
test_time = datetime.datetime(2000, 1, 1, 0, 0)
mock_utcnow.return_value = test_time
response = self.get_json('/audits/%s' % self.audit.uuid)
response = self.get_json(f'/audits/{self.audit.uuid}')
self.assertNotEqual(self.new_state, response['state'])
response = self.patch_json(
'/audits/%s' % self.audit.uuid,
f'/audits/{self.audit.uuid}',
[{'path': '/state', 'value': self.new_state,
'op': 'replace'}])
self.assertEqual('application/json', response.content_type)
self.assertEqual(HTTPStatus.OK, response.status_code)
response = self.get_json('/audits/%s' % self.audit.uuid)
response = self.get_json(f'/audits/{self.audit.uuid}')
self.assertEqual(self.new_state, response['state'])
return_updated_at = timeutils.parse_isotime(
response['updated_at']).replace(tzinfo=None)
@@ -607,7 +607,7 @@ class TestPost(TestPostBase):
self.assertEqual(HTTPStatus.CREATED, response.status_int)
# Check location header
self.assertIsNotNone(response.location)
expected_location = '/v1/audits/%s' % response.json['uuid']
expected_location = '/v1/audits/{}'.format(response.json['uuid'])
self.assertEqual(urlparse.urlparse(response.location).path,
expected_location)
self.assertEqual(objects.audit.State.PENDING,
@@ -1121,10 +1121,10 @@ class TestDelete(api_base.FunctionalTest):
new_state = objects.audit.State.ONGOING
self.patch_json(
'/audits/%s' % self.audit.uuid,
f'/audits/{self.audit.uuid}',
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
response = self.delete('/audits/%s' % self.audit.uuid,
response = self.delete(f'/audits/{self.audit.uuid}',
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
self.assertEqual('application/json', response.content_type)
@@ -1132,11 +1132,11 @@ class TestDelete(api_base.FunctionalTest):
new_state = objects.audit.State.CANCELLED
self.patch_json(
'/audits/%s' % self.audit.uuid,
f'/audits/{self.audit.uuid}',
[{'path': '/state', 'value': new_state,
'op': 'replace'}])
self.delete('/audits/%s' % self.audit.uuid)
response = self.get_json('/audits/%s' % self.audit.uuid,
self.delete(f'/audits/{self.audit.uuid}')
response = self.get_json(f'/audits/{self.audit.uuid}',
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
@@ -1153,7 +1153,7 @@ class TestDelete(api_base.FunctionalTest):
def test_delete_audit_not_found(self):
uuid = utils.generate_uuid()
response = self.delete('/audits/%s' % uuid, expect_errors=True)
response = self.delete(f'/audits/{uuid}', expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(response.json['error_message'])
@@ -1174,7 +1174,7 @@ class TestAuditPolicyEnforcement(api_base.FunctionalTest):
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
f"Policy doesn't allow {rule} to be performed.",
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
@@ -1186,7 +1186,7 @@ class TestAuditPolicyEnforcement(api_base.FunctionalTest):
audit = obj_utils.create_test_audit(self.context)
self._common_policy_check(
"audit:get", self.get_json,
'/audits/%s' % audit.uuid,
f'/audits/{audit.uuid}',
expect_errors=True)
def test_policy_disallow_detail(self):
@@ -1199,7 +1199,7 @@ class TestAuditPolicyEnforcement(api_base.FunctionalTest):
audit = obj_utils.create_test_audit(self.context)
self._common_policy_check(
"audit:update", self.patch_json,
'/audits/%s' % audit.uuid,
f'/audits/{audit.uuid}',
[{'path': '/state', 'value': objects.audit.State.SUCCEEDED,
'op': 'replace'}], expect_errors=True)
@@ -1216,7 +1216,7 @@ class TestAuditPolicyEnforcement(api_base.FunctionalTest):
audit = obj_utils.create_test_audit(self.context)
self._common_policy_check(
"audit:delete", self.delete,
'/audits/%s' % audit.uuid, expect_errors=True)
f'/audits/{audit.uuid}', expect_errors=True)
class TestAuditEnforcementWithAdminContext(TestListAudit,

View File

@@ -163,7 +163,7 @@ class TestDataModelPolicyEnforcement(api_base.FunctionalTest):
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
f"Policy doesn't allow {rule} to be performed.",
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):

View File

@@ -36,7 +36,7 @@ class TestListGoal(api_base.FunctionalTest):
def test_get_one_by_uuid(self):
goal = obj_utils.create_test_goal(self.context)
response = self.get_json('/goals/%s' % goal.uuid)
response = self.get_json(f'/goals/{goal.uuid}')
self.assertEqual(goal.uuid, response["uuid"])
self.assertEqual(goal.name, response["name"])
self._assert_goal_fields(response)
@@ -44,7 +44,7 @@ class TestListGoal(api_base.FunctionalTest):
def test_get_one_by_name(self):
goal = obj_utils.create_test_goal(self.context)
response = self.get_json(urlparse.quote(
'/goals/%s' % goal['name']))
'/goals/{}'.format(goal['name'])))
self.assertEqual(goal.uuid, response['uuid'])
self._assert_goal_fields(response)
@@ -52,13 +52,13 @@ class TestListGoal(api_base.FunctionalTest):
goal = obj_utils.create_test_goal(self.context)
goal.soft_delete()
response = self.get_json(
'/goals/%s' % goal['uuid'],
'/goals/{}'.format(goal['uuid']),
headers={'X-Show-Deleted': 'True'})
self.assertEqual(goal.uuid, response['uuid'])
self._assert_goal_fields(response)
response = self.get_json(
'/goals/%s' % goal['uuid'],
'/goals/{}'.format(goal['uuid']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -70,7 +70,7 @@ class TestListGoal(api_base.FunctionalTest):
def test_detail_against_single(self):
goal = obj_utils.create_test_goal(self.context)
response = self.get_json('/goals/%s/detail' % goal.uuid,
response = self.get_json(f'/goals/{goal.uuid}/detail',
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -138,7 +138,7 @@ class TestListGoal(api_base.FunctionalTest):
def test_sort_key_validation(self):
response = self.get_json(
'/goals?sort_key=%s' % 'bad_name',
'/goals?sort_key={}'.format('bad_name'),
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -154,7 +154,7 @@ class TestGoalPolicyEnforcement(api_base.FunctionalTest):
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
f"Policy doesn't allow {rule} to be performed.",
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
@@ -166,7 +166,7 @@ class TestGoalPolicyEnforcement(api_base.FunctionalTest):
goal = obj_utils.create_test_goal(self.context)
self._common_policy_check(
"goal:get", self.get_json,
'/goals/%s' % goal.uuid,
f'/goals/{goal.uuid}',
expect_errors=True)
def test_policy_disallow_detail(self):

View File

@@ -37,13 +37,13 @@ class TestListScoringEngine(api_base.FunctionalTest):
scoring_engine = obj_utils.create_test_scoring_engine(self.context)
scoring_engine.soft_delete()
response = self.get_json(
'/scoring_engines/%s' % scoring_engine['name'],
'/scoring_engines/{}'.format(scoring_engine['name']),
headers={'X-Show-Deleted': 'True'})
self.assertEqual(scoring_engine.name, response['name'])
self._assert_scoring_engine_fields(response)
response = self.get_json(
'/scoring_engines/%s' % scoring_engine['name'],
'/scoring_engines/{}'.format(scoring_engine['name']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -62,7 +62,7 @@ class TestListScoringEngine(api_base.FunctionalTest):
def test_detail_against_single(self):
scoring_engine = obj_utils.create_test_scoring_engine(self.context)
response = self.get_json(
'/scoring_engines/%s/detail' % scoring_engine.id,
f'/scoring_engines/{scoring_engine.id}/detail',
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -130,7 +130,7 @@ class TestListScoringEngine(api_base.FunctionalTest):
def test_sort_key_validation(self):
response = self.get_json(
'/goals?sort_key=%s' % 'bad_name',
'/goals?sort_key={}'.format('bad_name'),
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -146,7 +146,7 @@ class TestScoringEnginePolicyEnforcement(api_base.FunctionalTest):
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
f"Policy doesn't allow {rule} to be performed.",
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
@@ -158,7 +158,7 @@ class TestScoringEnginePolicyEnforcement(api_base.FunctionalTest):
se = obj_utils.create_test_scoring_engine(self.context)
self._common_policy_check(
"scoring_engine:get", self.get_json,
'/scoring_engines/%s' % se.uuid,
f'/scoring_engines/{se.uuid}',
expect_errors=True)
def test_policy_disallow_detail(self):

View File

@@ -34,7 +34,7 @@ class TestListService(api_base.FunctionalTest):
def test_get_one_by_id(self):
service = obj_utils.create_test_service(self.context)
response = self.get_json('/services/%s' % service.id)
response = self.get_json(f'/services/{service.id}')
self.assertEqual(service.id, response["id"])
self.assertEqual(service.name, response["name"])
self._assert_service_fields(response)
@@ -42,7 +42,7 @@ class TestListService(api_base.FunctionalTest):
def test_get_one_by_name(self):
service = obj_utils.create_test_service(self.context)
response = self.get_json(urlparse.quote(
'/services/%s' % service['name']))
'/services/{}'.format(service['name'])))
self.assertEqual(service.id, response['id'])
self._assert_service_fields(response)
@@ -50,13 +50,13 @@ class TestListService(api_base.FunctionalTest):
service = obj_utils.create_test_service(self.context)
service.soft_delete()
response = self.get_json(
'/services/%s' % service['id'],
'/services/{}'.format(service['id']),
headers={'X-Show-Deleted': 'True'})
self.assertEqual(service.id, response['id'])
self._assert_service_fields(response)
response = self.get_json(
'/services/%s' % service['id'],
'/services/{}'.format(service['id']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -73,7 +73,7 @@ class TestListService(api_base.FunctionalTest):
def test_detail_against_single(self):
service = obj_utils.create_test_service(self.context)
response = self.get_json('/services/%s/detail' % service.id,
response = self.get_json(f'/services/{service.id}/detail',
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -148,7 +148,7 @@ class TestListService(api_base.FunctionalTest):
def test_sort_key_validation(self):
response = self.get_json(
'/services?sort_key=%s' % 'bad_name',
'/services?sort_key={}'.format('bad_name'),
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -164,7 +164,7 @@ class TestServicePolicyEnforcement(api_base.FunctionalTest):
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
f"Policy doesn't allow {rule} to be performed.",
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
@@ -176,7 +176,7 @@ class TestServicePolicyEnforcement(api_base.FunctionalTest):
service = obj_utils.create_test_service(self.context)
self._common_policy_check(
"service:get", self.get_json,
'/services/%s' % service.id,
f'/services/{service.id}',
expect_errors=True)
def test_policy_disallow_detail(self):

View File

@@ -52,7 +52,7 @@ class TestListStrategy(api_base.FunctionalTest):
]
mock_strategy_info.return_value = mock_state
response = self.get_json('/strategies/%s/state' % strategy.uuid)
response = self.get_json(f'/strategies/{strategy.uuid}/state')
strategy_name = [requirement["state"] for requirement in response
if requirement["type"] == "Name"][0]
self.assertEqual(strategy.name, strategy_name)
@@ -65,7 +65,7 @@ class TestListStrategy(api_base.FunctionalTest):
def test_get_one_by_uuid(self):
strategy = obj_utils.create_test_strategy(self.context)
response = self.get_json('/strategies/%s' % strategy.uuid)
response = self.get_json(f'/strategies/{strategy.uuid}')
self.assertEqual(strategy.uuid, response["uuid"])
self.assertEqual(strategy.name, response["name"])
self._assert_strategy_fields(response)
@@ -73,7 +73,7 @@ class TestListStrategy(api_base.FunctionalTest):
def test_get_one_by_name(self):
strategy = obj_utils.create_test_strategy(self.context)
response = self.get_json(urlparse.quote(
'/strategies/%s' % strategy['name']))
'/strategies/{}'.format(strategy['name'])))
self.assertEqual(strategy.uuid, response['uuid'])
self._assert_strategy_fields(response)
@@ -81,13 +81,13 @@ class TestListStrategy(api_base.FunctionalTest):
strategy = obj_utils.create_test_strategy(self.context)
strategy.soft_delete()
response = self.get_json(
'/strategies/%s' % strategy['uuid'],
'/strategies/{}'.format(strategy['uuid']),
headers={'X-Show-Deleted': 'True'})
self.assertEqual(strategy.uuid, response['uuid'])
self._assert_strategy_fields(response)
response = self.get_json(
'/strategies/%s' % strategy['uuid'],
'/strategies/{}'.format(strategy['uuid']),
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -103,7 +103,7 @@ class TestListStrategy(api_base.FunctionalTest):
def test_detail_against_single(self):
strategy = obj_utils.create_test_strategy(self.context)
response = self.get_json('/strategies/%s/detail' % strategy.uuid,
response = self.get_json(f'/strategies/{strategy.uuid}/detail',
expect_errors=True)
self.assertEqual(HTTPStatus.NOT_FOUND, response.status_int)
@@ -174,16 +174,16 @@ class TestListStrategy(api_base.FunctionalTest):
obj_utils.create_test_strategy(
self.context, id=id_,
uuid=utils.generate_uuid(),
name='Goal %s' % id_,
name=f'Goal {id_}',
goal_id=goal1['id'])
for id_ in range(3, 5):
obj_utils.create_test_strategy(
self.context, id=id_,
uuid=utils.generate_uuid(),
name='Goal %s' % id_,
name=f'Goal {id_}',
goal_id=goal2['id'])
response = self.get_json('/strategies/?goal=%s' % goal1['uuid'])
response = self.get_json('/strategies/?goal={}'.format(goal1['uuid']))
strategies = response['strategies']
self.assertEqual(2, len(strategies))
@@ -206,16 +206,16 @@ class TestListStrategy(api_base.FunctionalTest):
obj_utils.create_test_strategy(
self.context, id=id_,
uuid=utils.generate_uuid(),
name='Goal %s' % id_,
name=f'Goal {id_}',
goal_id=goal1['id'])
for id_ in range(3, 5):
obj_utils.create_test_strategy(
self.context, id=id_,
uuid=utils.generate_uuid(),
name='Goal %s' % id_,
name=f'Goal {id_}',
goal_id=goal2['id'])
response = self.get_json('/strategies/?goal=%s' % goal1['name'])
response = self.get_json('/strategies/?goal={}'.format(goal1['name']))
strategies = response['strategies']
self.assertEqual(2, len(strategies))
@@ -239,7 +239,7 @@ class TestListStrategy(api_base.FunctionalTest):
def test_sort_key_validation(self):
response = self.get_json(
'/strategies?sort_key=%s' % 'bad_name',
'/strategies?sort_key={}'.format('bad_name'),
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -260,7 +260,7 @@ class TestStrategyPolicyEnforcement(api_base.FunctionalTest):
self.assertEqual(HTTPStatus.FORBIDDEN, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
f"Policy doesn't allow {rule} to be performed.",
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
@@ -272,7 +272,7 @@ class TestStrategyPolicyEnforcement(api_base.FunctionalTest):
strategy = obj_utils.create_test_strategy(self.context)
self._common_policy_check(
"strategy:get", self.get_json,
'/strategies/%s' % strategy.uuid,
f'/strategies/{strategy.uuid}',
expect_errors=True)
def test_policy_disallow_detail(self):
@@ -285,7 +285,7 @@ class TestStrategyPolicyEnforcement(api_base.FunctionalTest):
strategy = obj_utils.create_test_strategy(self.context)
self._common_policy_check(
"strategy:get", self.get_json,
'/strategies/%s/state' % strategy.uuid,
f'/strategies/{strategy.uuid}/state',
expect_errors=True)

View File

@@ -34,7 +34,7 @@ class TestPost(api_base.FunctionalTest):
self.context,
audit_type=objects.audit.AuditType.EVENT.value)
response = self.post_json(
'/webhooks/%s' % audit['uuid'], {},
'/webhooks/{}'.format(audit['uuid']), {},
headers={'OpenStack-API-Version': 'infra-optim 1.4'})
self.assertEqual(HTTPStatus.ACCEPTED, response.status_int)
mock_trigger_audit.assert_called_once_with(
@@ -52,7 +52,7 @@ class TestPost(api_base.FunctionalTest):
def test_trigger_audit_with_not_allowed_audittype(self):
audit = obj_utils.create_test_audit(self.context)
response = self.post_json(
'/webhooks/%s' % audit['uuid'], {},
'/webhooks/{}'.format(audit['uuid']), {},
headers={'OpenStack-API-Version': 'infra-optim 1.4'},
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)
@@ -65,7 +65,7 @@ class TestPost(api_base.FunctionalTest):
audit_type=objects.audit.AuditType.EVENT.value,
state=objects.audit.State.FAILED)
response = self.post_json(
'/webhooks/%s' % audit['uuid'], {},
'/webhooks/{}'.format(audit['uuid']), {},
headers={'OpenStack-API-Version': 'infra-optim 1.4'},
expect_errors=True)
self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_int)

View File

@@ -357,8 +357,8 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
'state': objects.action.State.FAILED,
'uuid': actions[0].uuid,
'action_type': 'fake_action',
'status_message': "Action failed in execute: The action %s "
"execution failed." % actions[0].uuid,
'status_message': (f"Action failed in execute: The action "
f"{actions[0].uuid} execution failed."),
},
))
@@ -563,8 +563,8 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
self.assertEqual(
objects.Action.get_by_uuid(
self.context, action4.uuid).status_message,
"Action failed in execute: The action %s execution failed."
% action4.uuid)
f"Action failed in execute: The action {action4.uuid} "
"execution failed.")
# action failed in the post_condition phase
self.check_action_state(action5, objects.action.State.FAILED)
self.assertEqual(

View File

@@ -223,8 +223,7 @@ class TestTaskFlowActionContainer(base.DbTestCase):
cfg.CONF.set_override("rollback_when_actionplan_failed", False,
group="watcher_applier")
action_name = "action_type:{} uuid:{}".format(action.action_type,
action.uuid)
action_name = f"action_type:{action.action_type} uuid:{action.uuid}"
expected_log = ('Failed actionplan rollback option is turned off, '
'and the following action will be skipped: %s')
action_container.revert()
@@ -248,8 +247,7 @@ class TestTaskFlowActionContainer(base.DbTestCase):
cfg.CONF.set_override("rollback_when_actionplan_failed", True,
group="watcher_applier")
action_name = "action_type:{} uuid:{}".format(action.action_type,
action.uuid)
action_name = f"action_type:{action.action_type} uuid:{action.uuid}"
expected_log = 'Revert action: %s'
action_container.revert()
mock_log.warning.assert_called_once_with(expected_log, action_name)

View File

@@ -55,8 +55,8 @@ class TestLoader(base.TestCase):
fake_driver = drivermanager.DriverManager.make_test_instance(
extension=stevedore_extension.Extension(
name="fake",
entry_point="{}:{}".format(FakeLoadable.__module__,
FakeLoadable.__name__),
entry_point=(f"{FakeLoadable.__module__}:"
f"{FakeLoadable.__name__}"),
plugin=FakeLoadable,
obj=None),
namespace="TESTING")
@@ -81,8 +81,8 @@ class TestLoader(base.TestCase):
fake_driver = drivermanager.DriverManager.make_test_instance(
extension=stevedore_extension.Extension(
name="fake",
entry_point="{}:{}".format(FakeLoadableWithOpts.__module__,
FakeLoadableWithOpts.__name__),
entry_point=(f"{FakeLoadableWithOpts.__module__}:"
f"{FakeLoadableWithOpts.__name__}"),
plugin=FakeLoadableWithOpts,
obj=None),
namespace="TESTING")

View File

@@ -155,7 +155,7 @@ class TestPlacementHelper(base.TestCase):
result = self.client.get_inventories(rp_uuid)
expected_url = '/resource_providers/%s/inventories' % rp_uuid
expected_url = f'/resource_providers/{rp_uuid}/inventories'
self._assert_keystone_called_once(kss_req, expected_url, 'GET')
self.assertEqual(fake_inventories, result)
@@ -182,7 +182,7 @@ class TestPlacementHelper(base.TestCase):
result = self.client.get_provider_traits(rp_uuid)
expected_url = '/resource_providers/%s/traits' % rp_uuid
expected_url = f'/resource_providers/{rp_uuid}/traits'
self._assert_keystone_called_once(kss_req, expected_url, 'GET')
self.assertEqual(fake_traits, result)
@@ -224,7 +224,7 @@ class TestPlacementHelper(base.TestCase):
result = self.client.get_allocations_for_consumer(c_uuid)
expected_url = '/allocations/%s' % c_uuid
expected_url = f'/allocations/{c_uuid}'
self._assert_keystone_called_once(kss_req, expected_url, 'GET')
self.assertEqual(fake_allocations, result)
@@ -254,7 +254,7 @@ class TestPlacementHelper(base.TestCase):
result = self.client.get_usages_for_resource_provider(rp_uuid)
expected_url = '/resource_providers/%s/usages' % rp_uuid
expected_url = f'/resource_providers/{rp_uuid}/usages'
self._assert_keystone_called_once(kss_req, expected_url, 'GET')
self.assertEqual(fake_usages, result)
@@ -306,7 +306,7 @@ class TestPlacementHelper(base.TestCase):
result = self.client.get_candidate_providers(resources)
expected_url = "/allocation_candidates?%s" % resources
expected_url = f"/allocation_candidates?{resources}"
self._assert_keystone_called_once(kss_req, expected_url, 'GET')
self.assertEqual(fake_provider_summaries, result)

View File

@@ -55,8 +55,8 @@ class TestListOpts(base.TestCase):
# OptGroup object for some exceptions this is not possible but
# new groups should use OptGroup
raise Exception(
"Invalid option group: {} should be of type OptGroup not "
"string.".format(section_name))
f"Invalid option group: {section_name} should be of "
"type OptGroup not string.")
self.assertIn(section_name, expected_sections)
self.assertTrue(len(options))
@@ -77,9 +77,9 @@ class TestListOpts(base.TestCase):
fake_extmanager_call = extension.ExtensionManager.make_test_instance(
extensions=[extension.Extension(
name=fake_strategies.FakeDummy1Strategy2.get_name(),
entry_point="{}:{}".format(
fake_strategies.FakeDummy1Strategy2.__module__,
fake_strategies.FakeDummy1Strategy2.__name__),
entry_point=(
f"{fake_strategies.FakeDummy1Strategy2.__module__}:"
f"{fake_strategies.FakeDummy1Strategy2.__name__}"),
plugin=fake_strategies.FakeDummy1Strategy2,
obj=None,
)],
@@ -107,9 +107,9 @@ class TestListOpts(base.TestCase):
fake_extmanager_call = extension.ExtensionManager.make_test_instance(
extensions=[extension.Extension(
name=fake_strategies.FakeDummy1Strategy1.get_name(),
entry_point="{}:{}".format(
fake_strategies.FakeDummy1Strategy1.__module__,
fake_strategies.FakeDummy1Strategy1.__name__),
entry_point=(
f"{fake_strategies.FakeDummy1Strategy1.__module__}:"
f"{fake_strategies.FakeDummy1Strategy1.__name__}"),
plugin=fake_strategies.FakeDummy1Strategy1,
obj=None,
)],
@@ -142,9 +142,9 @@ class TestPlugins(base.TestCase):
fake_extmanager_call = extension.ExtensionManager.make_test_instance(
extensions=[extension.Extension(
name=fake_strategies.FakeDummy1Strategy1.get_name(),
entry_point="{}:{}".format(
fake_strategies.FakeDummy1Strategy1.__module__,
fake_strategies.FakeDummy1Strategy1.__name__),
entry_point=(
f"{fake_strategies.FakeDummy1Strategy1.__module__}:"
f"{fake_strategies.FakeDummy1Strategy1.__name__}"),
plugin=fake_strategies.FakeDummy1Strategy1,
obj=None,
)],

View File

@@ -236,7 +236,7 @@ class DbActionDescriptionTestCase(base.DbTestCase):
for i in range(1, 4):
action_desc = utils.create_test_action_desc(
id=i,
action_type="action_%s" % i,
action_type=f"action_{i}",
description=f"description_{i}")
ids.append(action_desc['id'])
action_descs = self.dbapi.get_action_description_list(self.context)

View File

@@ -39,7 +39,7 @@ class TestDbAuditFilters(base.DbTestCase):
self.audit_template_name = "Audit Template"
def gen_name():
return "Audit %s" % w_utils.generate_uuid()
return f"Audit {w_utils.generate_uuid()}"
self.audit1_name = gen_name()
self.audit2_name = gen_name()

View File

@@ -37,7 +37,7 @@ class TestDbAuditTemplateFilters(base.DbTestCase):
def _data_setup(self):
def gen_name():
return "Audit Template %s" % w_utils.generate_uuid()
return f"Audit Template {w_utils.generate_uuid()}"
self.audit_template1_name = gen_name()
self.audit_template2_name = gen_name()

View File

@@ -228,8 +228,8 @@ class DbGoalTestCase(base.DbTestCase):
goal = utils.create_test_goal(
id=i,
uuid=w_utils.generate_uuid(),
name="GOAL_%s" % i,
display_name='My Goal %s' % i)
name=f"GOAL_{i}",
display_name=f'My Goal {i}')
uuids.append(str(goal['uuid']))
goals = self.dbapi.get_goal_list(self.context)
goal_uuids = [g.uuid for g in goals]

View File

@@ -73,7 +73,7 @@ class TestPurgeCommand(base.DbTestCase):
seed += 1
def generate_unique_name(self, prefix):
return "{}{}".format(prefix, uuidutils.generate_uuid())
return f"{prefix}{uuidutils.generate_uuid()}"
def _data_setup(self):
# All the 1's are soft_deleted and are expired

View File

@@ -232,9 +232,9 @@ class DbScoringEngineTestCase(base.DbTestCase):
scoring_engine = utils.create_test_scoring_engine(
id=i,
uuid=w_utils.generate_uuid(),
name="SE_ID_%s" % i,
name=f"SE_ID_{i}",
description=f'My ScoringEngine {i}',
metainfo='a{0}=b{0}'.format(i))
metainfo=f'a{i}=b{i}')
names.append(str(scoring_engine['name']))
scoring_engines = self.dbapi.get_scoring_engine_list(self.context)
scoring_engines_names = [se.name for se in scoring_engines]

View File

@@ -233,7 +233,7 @@ class DbServiceTestCase(base.DbTestCase):
for i in range(1, 4):
service = utils.create_test_service(
id=i,
name="SERVICE_ID_%s" % i,
name=f"SERVICE_ID_{i}",
host=f"controller_{i}")
ids.append(service['id'])
services = self.dbapi.get_service_list(self.context)

View File

@@ -243,7 +243,7 @@ class DbStrategyTestCase(base.DbTestCase):
strategy = utils.create_test_strategy(
id=i,
uuid=w_utils.generate_uuid(),
name="STRATEGY_ID_%s" % i,
name=f"STRATEGY_ID_{i}",
display_name=f'My Strategy {i}')
uuids.append(str(strategy['uuid']))
strategies = self.dbapi.get_strategy_list(self.context)
@@ -260,7 +260,7 @@ class DbStrategyTestCase(base.DbTestCase):
strategy = utils.create_test_strategy(
id=i,
uuid=w_utils.generate_uuid(),
name="STRATEGY_ID_%s" % i,
name=f"STRATEGY_ID_{i}",
display_name=f'My Strategy {i}',
goal_id=goal.id)
uuids.append(str(strategy['uuid']))

View File

@@ -43,9 +43,9 @@ class TestClusterDataModelCollectorLoader(base.TestCase):
fake_driver_call = drivermanager.DriverManager.make_test_instance(
extension=stevedore_extension.Extension(
name=fake_driver,
entry_point="{}:{}".format(
faker_cluster_state.FakerModelCollector.__module__,
faker_cluster_state.FakerModelCollector.__name__),
entry_point=(
f"{faker_cluster_state.FakerModelCollector.__module__}:"
f"{faker_cluster_state.FakerModelCollector.__name__}"),
plugin=faker_cluster_state.FakerModelCollector,
obj=None,
),

View File

@@ -38,9 +38,9 @@ class TestDefaultStrategyLoader(base.TestCase):
fake_extmanager_call = extension.ExtensionManager.make_test_instance(
extensions=[extension.Extension(
name=dummy_strategy_name,
entry_point="{}:{}".format(
dummy_strategy.DummyStrategy.__module__,
dummy_strategy.DummyStrategy.__name__),
entry_point=(
f"{dummy_strategy.DummyStrategy.__module__}:"
f"{dummy_strategy.DummyStrategy.__name__}"),
plugin=dummy_strategy.DummyStrategy,
obj=None,
)],

View File

@@ -38,9 +38,7 @@ class TestDefaultGoalLoader(base.TestCase):
fake_extmanager_call = extension.ExtensionManager.make_test_instance(
extensions=[extension.Extension(
name=dummy_goal_name,
entry_point="{}:{}".format(
goals.Dummy.__module__,
goals.Dummy.__name__),
entry_point=f"{goals.Dummy.__module__}:{goals.Dummy.__name__}",
plugin=goals.Dummy,
obj=None,
)],

View File

@@ -220,7 +220,7 @@ class FakerStorageModelCollector(base.BaseClusterDataModelCollector):
volume_count = 9
for i in range(0, node_count):
host = "host_{0}@backend_{0}".format(i)
host = f"host_{i}@backend_{i}"
zone = f"zone_{i}"
volume_type = [f"type_{i}"]
node_attributes = {
@@ -234,7 +234,7 @@ class FakerStorageModelCollector(base.BaseClusterDataModelCollector):
model.add_node(node)
for j in range(0, pool_count):
name = "host_{0}@backend_{0}#pool_{1}".format(i, j)
name = f"host_{i}@backend_{i}#pool_{j}"
pool_attributes = {
"name": name,
"total_volumes": 2,

View File

@@ -175,7 +175,7 @@ class FakeGnocchiMetrics:
"""
resource = args[0]
uuid = "{}_{}".format(resource.uuid, resource.hostname)
uuid = f"{resource.uuid}_{resource.hostname}"
# Normalize
measurements = {}

View File

@@ -81,4 +81,4 @@ def get_policy_data(compat):
elif compat == 'juno':
return policy_data_compat_juno
else:
raise Exception('Policy data for %s not available' % compat)
raise Exception(f'Policy data for {compat} not available')