Refactor action execution reporter

* Moved away from using Oslo periodic tasks in the action execution
  reporter since in this case they don't make the code more readable.
  Also, now it is symmetric with other similar components like action
  execution checker.
* Refactored action execution checker w/o using classes since having
  many instances of it doesn't make sense.
* Small style changes

Change-Id: I9a97c40222e8dc4870c9b6a7c5f5e3c14f37bdd6
This commit is contained in:
Renat Akhmerov 2019-11-11 11:36:49 +07:00
parent 1c7e242975
commit 0e758e16e1
3 changed files with 82 additions and 62 deletions

View File

@ -39,14 +39,11 @@ class ExecutorServer(service_base.MistralService):
self.executor = executor
self._rpc_server = None
self._reporter = None
self._aer = None
def start(self):
super(ExecutorServer, self).start()
self._aer = action_execution_reporter.ActionExecutionReporter(CONF)
self._reporter = action_execution_reporter.setup(self._aer)
action_execution_reporter.start()
if self._setup_profiler:
profiler_utils.setup('mistral-executor', cfg.CONF.executor.host)
@ -63,8 +60,7 @@ class ExecutorServer(service_base.MistralService):
def stop(self, graceful=False):
super(ExecutorServer, self).stop(graceful)
if self._reporter:
self._reporter.stop(graceful)
action_execution_reporter.stop()
if self._rpc_server:
self._rpc_server.stop(graceful)
@ -101,7 +97,7 @@ class ExecutorServer(service_base.MistralService):
redelivered = rpc_ctx.redelivered or False
try:
self._aer.add_action_ex_id(action_ex_id)
action_execution_reporter.add_action_ex_id(action_ex_id)
res = self.executor.run_action(
action_ex_id,
@ -123,7 +119,7 @@ class ExecutorServer(service_base.MistralService):
return res
finally:
self._aer.remove_action_ex_id(action_ex_id)
action_execution_reporter.remove_action_ex_id(action_ex_id)
def get_oslo_service(setup_profiler=True):

View File

@ -89,7 +89,7 @@ def _loop():
except Exception:
LOG.exception(
'Action execution checker iteration failed'
' due to unexpected exception.'
' due to an unexpected exception.'
)
# For some mysterious reason (probably eventlet related)

View File

@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import sys
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import periodic_task
from oslo_service import threadgroup
from mistral import context as auth_ctx
from mistral.rpc import clients as rpc
@ -25,69 +26,92 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class ActionExecutionReporter(periodic_task.PeriodicTasks):
"""The reporter that reports the running action executions."""
_enabled = False
_stopped = True
def __init__(self, conf):
super(ActionExecutionReporter, self).__init__(conf)
self._engine_client = rpc.get_engine_client()
self._running_actions = set()
self.interval = CONF.action_heartbeat.check_interval
self.max_missed = CONF.action_heartbeat.max_missed_heartbeats
self.enabled = self.interval and self.max_missed
_periodic_task = periodic_task.periodic_task(
spacing=self.interval,
run_immediately=True
)
self.add_periodic_task(
_periodic_task(report)
)
def add_action_ex_id(self, action_ex_id):
# With run-action there is no actions_ex_id assigned
if action_ex_id and self.enabled:
self._engine_client.report_running_actions([action_ex_id])
self._running_actions.add(action_ex_id)
def remove_action_ex_id(self, action_ex_id):
if action_ex_id and self.enabled:
self._running_actions.discard(action_ex_id)
_running_actions = set()
def report(reporter, ctx):
def add_action_ex_id(action_ex_id):
global _enabled
# With run-action there is no actions_ex_id assigned.
if action_ex_id and _enabled:
rpc.get_engine_client().report_running_actions([action_ex_id])
_running_actions.add(action_ex_id)
def remove_action_ex_id(action_ex_id):
global _enabled
if action_ex_id and _enabled:
_running_actions.discard(action_ex_id)
def report_running_actions():
LOG.debug("Running heartbeat reporter...")
if not reporter._running_actions:
global _running_actions
if not _running_actions:
return
auth_ctx.set_ctx(ctx)
reporter._engine_client.report_running_actions(reporter._running_actions)
rpc.get_engine_client().report_running_actions(_running_actions)
def setup(action_execution_reporter):
def _loop():
global _stopped
# This is an administrative thread so we need to set an admin
# security context.
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
while not _stopped:
try:
report_running_actions()
except Exception:
LOG.exception(
'Action execution reporter iteration failed'
' due to an unexpected exception.'
)
# For some mysterious reason (probably eventlet related)
# the exception is not cleared from the context automatically.
# This results in subsequent log.warning calls to show invalid
# info.
if sys.version_info < (3,):
sys.exc_clear()
eventlet.sleep(CONF.action_heartbeat.check_interval)
def start():
global _stopped, _enabled
interval = CONF.action_heartbeat.check_interval
max_missed = CONF.action_heartbeat.max_missed_heartbeats
enabled = interval and max_missed
if not enabled:
LOG.info("Action heartbeat reporting disabled.")
return None
tg = threadgroup.ThreadGroup()
_enabled = interval and max_missed
ctx = auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
if not _enabled:
LOG.info("Action heartbeat reporting is disabled.")
tg.add_dynamic_timer(
action_execution_reporter.run_periodic_tasks,
initial_delay=None,
periodic_interval_max=1,
context=ctx
)
return
return tg
_stopped = False
eventlet.spawn(_loop)
def stop(graceful=False):
global _stopped
_stopped = True