Riccardo Pittau ca7a46b113 Stop using six library
Since we've dropped support for Python 2.7, it's time to look at
the bright future that Python 3.x will bring and stop forcing
compatibility with older versions.
This patch removes the six library from requirements, not
looking back.

Change-Id: I4795417aa649be75ba7162a8cf30eacbb88c7b5e
2019-11-29 10:18:14 +01:00

327 lines
12 KiB
Python

# Copyright 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import functools
import inspect
import threading
from oslo_log import log
from oslo_utils import uuidutils
from ironic_python_agent import encoding
from ironic_python_agent import errors
LOG = log.getLogger()
class AgentCommandStatus(object):
"""Mapping of agent command statuses."""
RUNNING = u'RUNNING'
SUCCEEDED = u'SUCCEEDED'
FAILED = u'FAILED'
CLEAN_VERSION_MISMATCH = u'CLEAN_VERSION_MISMATCH'
class BaseCommandResult(encoding.SerializableComparable):
"""Base class for command result."""
serializable_fields = ('id', 'command_name', 'command_params',
'command_status', 'command_error', 'command_result')
def __init__(self, command_name, command_params):
"""Construct an instance of BaseCommandResult.
:param command_name: name of command executed
:param command_params: parameters passed to command
"""
self.id = uuidutils.generate_uuid()
self.command_name = command_name
self.command_params = command_params
self.command_status = AgentCommandStatus.RUNNING
self.command_error = None
self.command_result = None
def __str__(self):
return ("Command name: %(name)s, "
"params: %(params)s, status: %(status)s, result: "
"%(result)s." %
{"name": self.command_name, "params": self.command_params,
"status": self.command_status,
"result": self.command_result})
def is_done(self):
"""Checks to see if command is still RUNNING.
:returns: True if command is done, False if still RUNNING
"""
return self.command_status != AgentCommandStatus.RUNNING
def join(self):
""":returns: result of completed command."""
return self
class SyncCommandResult(BaseCommandResult):
"""A result from a command that executes synchronously."""
def __init__(self, command_name, command_params, success, result_or_error):
"""Construct an instance of SyncCommandResult.
:param command_name: name of command executed
:param command_params: parameters passed to command
:param success: True indicates success, False indicates failure
:param result_or_error: Contains the result (or error) from the command
"""
super(SyncCommandResult, self).__init__(command_name,
command_params)
if isinstance(result_or_error, (bytes, str)):
result_key = 'result' if success else 'error'
result_or_error = {result_key: result_or_error}
if success:
self.command_status = AgentCommandStatus.SUCCEEDED
self.command_result = result_or_error
else:
self.command_status = AgentCommandStatus.FAILED
self.command_error = result_or_error
class AsyncCommandResult(BaseCommandResult):
"""A command that executes asynchronously in the background."""
def __init__(self, command_name, command_params, execute_method,
agent=None):
"""Construct an instance of AsyncCommandResult.
:param command_name: name of command to execute
:param command_params: parameters passed to command
:param execute_method: a callable to be executed asynchronously
:param agent: Optional: an instance of IronicPythonAgent
"""
super(AsyncCommandResult, self).__init__(command_name, command_params)
self.agent = agent
self.execute_method = execute_method
self.command_state_lock = threading.Lock()
thread_name = 'agent-command-{}'.format(self.id)
self.execution_thread = threading.Thread(target=self.run,
name=thread_name)
def serialize(self):
"""Serializes the AsyncCommandResult into a dict.
:returns: dict containing serializable fields in AsyncCommandResult
"""
with self.command_state_lock:
return super(AsyncCommandResult, self).serialize()
def start(self):
"""Begin background execution of command."""
self.execution_thread.start()
return self
def join(self, timeout=None):
"""Block until command has completed, and return result.
:param timeout: float indicating max seconds to wait for command
to complete. Defaults to None.
"""
self.execution_thread.join(timeout)
return self
def is_done(self):
"""Checks to see if command is still RUNNING.
:returns: True if command is done, False if still RUNNING
"""
with self.command_state_lock:
return super(AsyncCommandResult, self).is_done()
def run(self):
"""Run a command."""
try:
result = self.execute_method(**self.command_params)
if isinstance(result, (bytes, str)):
result = {'result': '{}: {}'.format(self.command_name, result)}
LOG.info('Command: %(name)s, result: %(result)s',
{'name': self.command_name, 'result': result})
with self.command_state_lock:
self.command_result = result
self.command_status = AgentCommandStatus.SUCCEEDED
except errors.CleanVersionMismatch as e:
with self.command_state_lock:
self.command_error = e
self.command_status = AgentCommandStatus.CLEAN_VERSION_MISMATCH
self.command_result = None
LOG.error('Clean version mismatch for command %s',
self.command_name)
except Exception as e:
LOG.exception('Command failed: %(name)s, error: %(err)s',
{'name': self.command_name, 'err': e})
if not isinstance(e, errors.RESTError):
e = errors.CommandExecutionError(str(e))
with self.command_state_lock:
self.command_error = e
self.command_status = AgentCommandStatus.FAILED
finally:
if self.agent:
self.agent.force_heartbeat()
class BaseAgentExtension(object):
def __init__(self, agent=None):
super(BaseAgentExtension, self).__init__()
self.agent = agent
self.command_map = dict(
(v.command_name, v)
for k, v in inspect.getmembers(self)
if hasattr(v, 'command_name')
)
def execute(self, command_name, **kwargs):
cmd = self.command_map.get(command_name)
if cmd is None:
raise errors.InvalidCommandError(
'Unknown command: {}'.format(command_name))
return cmd(**kwargs)
def check_cmd_presence(self, ext_obj, ext, cmd):
if not (hasattr(ext_obj, 'execute') and hasattr(ext_obj, 'command_map')
and cmd in ext_obj.command_map):
raise errors.InvalidCommandParamsError(
"Extension {} doesn't provide {} method".format(ext, cmd))
class ExecuteCommandMixin(object):
def __init__(self):
self.command_lock = threading.Lock()
self.command_results = collections.OrderedDict()
self.ext_mgr = None
def get_extension(self, extension_name):
if self.ext_mgr is None:
raise errors.ExtensionError('Extension manager is not initialized')
ext = self.ext_mgr[extension_name].obj
ext.ext_mgr = self.ext_mgr
return ext
def split_command(self, command_name):
command_parts = command_name.split('.', 1)
if len(command_parts) != 2:
raise errors.InvalidCommandError(
'Command name must be of the form <extension>.<name>')
return (command_parts[0], command_parts[1])
def execute_command(self, command_name, **kwargs):
"""Execute an agent command."""
with self.command_lock:
LOG.debug('Executing command: %(name)s with args: %(args)s',
{'name': command_name, 'args': kwargs})
extension_part, command_part = self.split_command(command_name)
if len(self.command_results) > 0:
last_command = list(self.command_results.values())[-1]
if not last_command.is_done():
LOG.error('Tried to execute %(command)s, agent is still '
'executing %(last)s', {'command': command_name,
'last': last_command})
raise errors.CommandExecutionError('agent is busy')
try:
ext = self.get_extension(extension_part)
result = ext.execute(command_part, **kwargs)
except KeyError:
# Extension Not found
LOG.exception('Extension %s not found', extension_part)
raise errors.RequestedObjectNotFoundError('Extension',
extension_part)
except errors.InvalidContentError as e:
# Any command may raise a InvalidContentError which will be
# returned to the caller directly.
LOG.exception('Invalid content error: %s', e)
raise e
except Exception as e:
# Other errors are considered command execution errors, and are
# recorded as a failed SyncCommandResult with an error message
LOG.exception('Command execution error: %s', e)
result = SyncCommandResult(command_name, kwargs, False, e)
LOG.info('Command %(name)s completed: %(result)s',
{'name': command_name, 'result': result})
self.command_results[result.id] = result
return result
def async_command(command_name, validator=None):
"""Will run the command in an AsyncCommandResult in its own thread.
command_name is set based on the func name and command_params will
be whatever args/kwargs you pass into the decorated command.
Return values of type `str` or `unicode` are prefixed with the
`command_name` parameter when returned for consistency.
"""
def async_decorator(func):
func.command_name = command_name
@functools.wraps(func)
def wrapper(self, **command_params):
# Run a validator before passing everything off to async.
# validators should raise exceptions or return silently.
if validator:
validator(self, **command_params)
# bind self to func so that AsyncCommandResult doesn't need to
# know about the mode
bound_func = functools.partial(func, self)
return AsyncCommandResult(command_name,
command_params,
bound_func,
agent=self.agent).start()
return wrapper
return async_decorator
def sync_command(command_name, validator=None):
"""Decorate a method to wrap its return value in a SyncCommandResult.
For consistency with @async_command() can also accept a
validator which will be used to validate input, although a synchronous
command can also choose to implement validation inline.
"""
def sync_decorator(func):
func.command_name = command_name
@functools.wraps(func)
def wrapper(self, **command_params):
# Run a validator before invoking the function.
# validators should raise exceptions or return silently.
if validator:
validator(self, **command_params)
result = func(self, **command_params)
return SyncCommandResult(command_name,
command_params,
True,
result)
return wrapper
return sync_decorator